-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.go
150 lines (129 loc) · 3.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"fmt"
"strconv"
"time"
"math/rand"
)
//声明成游戏
type Payload struct {
name string
}
//打游戏
func (p *Payload) Play() {
fmt.Printf("%s 打LOL游戏...当前任务完成\n",p.name)
}
//任务
type Job struct {
Payload Payload
}
//任务队列
var JobQueue chan Job
// 工人
type Worker struct {
name string //工人的名字
// WorkerPool chan JobQueue //对象池
WorkerPool chan chan Job//对象池
JobChannel chan Job //通道里面拿
quit chan bool //
}
// 新建一个工人
func NewWorker(workerPool chan chan Job,name string) Worker{
fmt.Printf("创建了一个工人,它的名字是:%s \n",name);
return Worker{
name:name,//工人的名字
WorkerPool: workerPool, //工人在哪个对象池里工作,可以理解成部门
JobChannel:make(chan Job),//工人的任务
quit:make(chan bool),
}
}
// 工人开始工作
func (w *Worker) Start(){
//开一个新的协程
go func(){
for{
//注册到对象池中,
w.WorkerPool <-w.JobChannel
fmt.Printf("[%s]把自己注册到 对象池中 \n",w.name)
select {
//接收到了新的任务
case job :=<- w.JobChannel:
fmt.Printf("[%s] 工人接收到了任务 当前任务的长度是[%d]\n",w.name,len(w.WorkerPool))
job.Payload.Play()
time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
//接收到了任务
case <-w.quit:
return
}
}
}()
}
func (w Worker) Stop(){
go func(){
w.quit <- true
}()
}
type Dispatcher struct {
//WorkerPool chan JobQueue
name string //调度的名字
maxWorkers int //获取 调试的大小
WorkerPool chan chan Job //注册和工人一样的通道
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool :=make(chan chan Job,maxWorkers)
return &Dispatcher{
WorkerPool:pool,// 将工人放到一个池中,可以理解成一个部门中
name:"调度者",//调度者的名字
maxWorkers:maxWorkers,//这个调度者有好多个工人
}
}
func (d *Dispatcher) Run(){
// 开始运行
for i :=0;i<d.maxWorkers;i++{
worker := NewWorker(d.WorkerPool,fmt.Sprintf("work-%s",strconv.Itoa(i)))
//开始工作
worker.Start()
}
//监控
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job :=<-JobQueue:
fmt.Println("调度者,接收到一个工作任务")
time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
// 调度者接收到一个工作任务
go func (job Job) {
//从现有的对象池中拿出一个
jobChannel := <-d.WorkerPool
jobChannel <- job
}(job)
default:
//fmt.Println("ok!!")
}
}
}
func initialize() {
maxWorkers := 2;
maxQueue := 4;
//初始化一个调试者,并指定它可以操作的 工人个数
dispatch := NewDispatcher(maxWorkers)
JobQueue =make(chan Job,maxQueue) //指定任务的队列长度
//并让它一直接运行
dispatch.Run()
}
func main() {
//初始化对象池
initialize()
for i:=0;i<10;i++{
p := Payload{
fmt.Sprintf("玩家-[%s]",strconv.Itoa(i)),
}
JobQueue <- Job{
Payload:p,
}
time.Sleep(time.Second)
}
close(JobQueue)
}