Skip to content

Commit 9102c4e

Browse files
authored
Merge pull request #3 from ApplauseOSS/PD-4823-workers
PD-4823 Use a configurable number of workers
2 parents 6934b08 + 297b22c commit 9102c4e

File tree

4 files changed

+113
-22
lines changed

4 files changed

+113
-22
lines changed

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,14 @@ encrypted environment variables passed into it.
1414
This project is a replacement for the ApplauseOSS/kms-encryption-toolbox
1515
supplied shell script, `decrypt-and-start`.
1616

17+
It can be run as:
1718

19+
```bash
20+
$ decrypt-and-start some other program
21+
```
22+
23+
It can also take an optional flag to control the number of parallel workers:
24+
25+
```bash
26+
$ decrypt-and-start -p 20 -- some other program
27+
```

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.1.0
1+
0.2.0

decrypt-and-start.go

+32-21
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package main
22

33
import (
4-
"encoding/base64"
54
"flag"
65
"fmt"
76
"github.com/applauseoss/decrypt-and-start/lib"
8-
enc_sdk "github.com/applauseoss/decrypt-and-start/lib/aws_encryption_sdk"
97
"log"
108
"os"
119
"os/exec"
@@ -15,36 +13,49 @@ import (
1513

1614
// This function should work like an entrypoint: exec "${@}"
1715
func Exec() {
18-
flag.Parse()
19-
if len(os.Args) == 1 {
16+
args := flag.Args()
17+
if len(args) == 0 {
2018
return
2119
}
22-
cmd, err := exec.LookPath(os.Args[1])
20+
cmd, err := exec.LookPath(args[0])
2321
if err != nil {
2422
log.Fatal(err)
2523
}
26-
if err := syscall.Exec(cmd, flag.Args(), os.Environ()); err != nil {
24+
if err := syscall.Exec(cmd, args, os.Environ()); err != nil {
2725
log.Fatal(err)
2826
}
2927
}
3028

3129
func main() {
32-
for _, e := range os.Environ() {
33-
// e = each k=v pair/line, pair = split k = [0], v = [1] array
34-
pair := strings.SplitN(e, "=", 2)
35-
// See if value starts with 'decrypt:'
36-
if strings.HasPrefix(pair[1], "decrypt:") {
37-
fmt.Println("Decrypting the value of " + pair[0] + "...")
38-
ciphertext, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(pair[1], "decrypt:"))
39-
if err != nil {
40-
log.Fatal(err)
41-
}
42-
kms_helper := enc_sdk.NewKmsHelper(lib.GetRegion())
43-
decrypted_value, err := kms_helper.Decrypt(ciphertext)
44-
if err != nil {
45-
log.Fatal(err)
30+
var workerCount int
31+
flag.IntVar(&workerCount, "p", 10, "number of parallel workers (defaults to 10)")
32+
flag.Parse()
33+
workerPool := lib.NewWorkerPool(workerCount)
34+
workerPool.Start()
35+
// Put encrypted env vars in queue for workers to process
36+
go func() {
37+
for _, e := range os.Environ() {
38+
// e = each k=v pair/line, pair = split k = [0], v = [1] array
39+
pair := strings.SplitN(e, "=", 2)
40+
// See if value starts with 'decrypt:'
41+
if strings.HasPrefix(pair[1], "decrypt:") {
42+
env := &lib.EnvVar{Name: pair[0], Value: pair[1]}
43+
workerPool.InChan <- env
44+
fmt.Println("Decrypting the value of " + pair[0] + "...")
4645
}
47-
os.Setenv(pair[0], string(decrypted_value))
46+
}
47+
// Close the input channel so workers know there's nothing left to process
48+
close(workerPool.InChan)
49+
}()
50+
// Process decrypted values
51+
for {
52+
env, ok := <-workerPool.OutChan
53+
if env != nil {
54+
os.Setenv(env.Name, env.Value)
55+
}
56+
// If the output channel is closed, there are no more values to receive
57+
if !ok {
58+
break
4859
}
4960
}
5061
Exec()

lib/worker.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package lib
2+
3+
import (
4+
"encoding/base64"
5+
enc_sdk "github.com/applauseoss/decrypt-and-start/lib/aws_encryption_sdk"
6+
"log"
7+
"strings"
8+
)
9+
10+
type EnvVar struct {
11+
Name string
12+
Value string
13+
}
14+
15+
type WorkerPool struct {
16+
InChan chan *EnvVar
17+
OutChan chan *EnvVar
18+
workerCount int
19+
doneChan chan bool
20+
}
21+
22+
func NewWorkerPool(count int) *WorkerPool {
23+
w := &WorkerPool{workerCount: count}
24+
w.InChan = make(chan *EnvVar)
25+
w.OutChan = make(chan *EnvVar)
26+
w.doneChan = make(chan bool)
27+
return w
28+
}
29+
30+
func (w *WorkerPool) Start() {
31+
for i := 0; i < w.workerCount; i++ {
32+
go func() {
33+
kmsHelper := enc_sdk.NewKmsHelper(GetRegion())
34+
for {
35+
env, ok := <-w.InChan
36+
if env != nil {
37+
ciphertext, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(env.Value, "decrypt:"))
38+
if err != nil {
39+
log.Fatal(err)
40+
}
41+
decrypted_value, err := kmsHelper.Decrypt(ciphertext)
42+
if err != nil {
43+
log.Fatal(err)
44+
}
45+
env.Value = string(decrypted_value)
46+
w.OutChan <- env
47+
}
48+
if !ok {
49+
break
50+
}
51+
}
52+
w.doneChan <- true
53+
}()
54+
}
55+
// Wait for workers to finish
56+
go func() {
57+
remainingWorkers := w.workerCount
58+
for {
59+
done := <-w.doneChan
60+
if done {
61+
remainingWorkers--
62+
if remainingWorkers == 0 {
63+
break
64+
}
65+
}
66+
}
67+
// Close the output channel when all workers have finished
68+
close(w.OutChan)
69+
}()
70+
}

0 commit comments

Comments
 (0)