-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathanalysis.py
147 lines (127 loc) · 4.6 KB
/
analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import math
from migen.fhdl.std import *
from migen.flow.actor import *
from migen.bank.description import *
from migen.genlib.cdc import MultiReg
from migen.genlib.record import Record
from migen.genlib.fifo import AsyncFIFO
from gateware.csc.rgb2ycbcr import RGB2YCbCr
from gateware.csc.ycbcr444to422 import YCbCr444to422
# TODO: Add tests to validate FrameExtraction module
class FrameExtraction(Module, AutoCSR):
def __init__(self, word_width, fifo_depth):
# in pix clock domain
self.valid_i = Signal()
self.vsync = Signal()
self.de = Signal()
self.r = Signal(8)
self.g = Signal(8)
self.b = Signal(8)
self.counter = Signal(math.ceil(math.log2(1024*768)))
word_layout = [("sof", 1), ("pixels", word_width)]
self.frame = Source(word_layout)
self.busy = Signal()
self._overflow = CSR()
self._start_counter = CSRStorage(1, reset=0)
self.sync += [
If(self._start_counter.storage,
self.counter.eq(self.counter + 1)
)
]
de_r = Signal()
self.sync.pix += de_r.eq(self.de)
rgb2ycbcr = RGB2YCbCr()
self.submodules += RenameClockDomains(rgb2ycbcr, "pix")
chroma_downsampler = YCbCr444to422()
self.submodules += RenameClockDomains(chroma_downsampler, "pix")
self.comb += [
rgb2ycbcr.sink.stb.eq(self.valid_i),
rgb2ycbcr.sink.sop.eq(self.de & ~de_r),
rgb2ycbcr.sink.r.eq(self.r),
rgb2ycbcr.sink.g.eq(self.g),
rgb2ycbcr.sink.b.eq(self.b),
Record.connect(rgb2ycbcr.source, chroma_downsampler.sink),
chroma_downsampler.source.ack.eq(1)
]
# XXX need clean up
de = self.de
vsync = self.vsync
for i in range(rgb2ycbcr.latency + chroma_downsampler.latency):
next_de = Signal()
next_vsync = Signal()
self.sync.pix += [
next_de.eq(de),
next_vsync.eq(vsync)
]
de = next_de
vsync = next_vsync
# start of frame detection
vsync_r = Signal()
new_frame = Signal()
self.comb += new_frame.eq(vsync & ~vsync_r)
self.sync.pix += vsync_r.eq(vsync)
# pack pixels into words
cur_word = Signal(word_width)
cur_word_valid = Signal()
encoded_pixel = Signal(16)
self.comb += encoded_pixel.eq(Cat(chroma_downsampler.source.y, chroma_downsampler.source.cb_cr)),
pack_factor = word_width//16
assert(pack_factor & (pack_factor - 1) == 0) # only support powers of 2
pack_counter = Signal(max=pack_factor)
self.sync.pix += [
cur_word_valid.eq(0),
If(new_frame,
cur_word_valid.eq(pack_counter == (pack_factor - 1)),
pack_counter.eq(0),
).Elif(chroma_downsampler.source.stb & de,
[If(pack_counter == (pack_factor-i-1),
cur_word[16*i:16*(i+1)].eq(encoded_pixel)) for i in range(pack_factor)],
cur_word_valid.eq(pack_counter == (pack_factor - 1)),
pack_counter.eq(pack_counter + 1)
)
]
# FIFO
fifo = RenameClockDomains(AsyncFIFO(word_layout, fifo_depth),
{"write": "pix", "read": "sys"})
self.submodules += fifo
self.comb += [
fifo.din.pixels.eq(cur_word),
fifo.we.eq(cur_word_valid)
]
self.sync.pix += \
If(new_frame,
fifo.din.sof.eq(1)
).Elif(cur_word_valid,
fifo.din.sof.eq(0)
)
self.comb += [
self.frame.stb.eq(fifo.readable),
self.frame.payload.eq(fifo.dout),
fifo.re.eq(self.frame.ack),
self.busy.eq(0)
]
# overflow detection
pix_overflow = Signal()
pix_overflow_reset = Signal()
self.sync.pix += [
If(fifo.we & ~fifo.writable,
pix_overflow.eq(1)
).Elif(pix_overflow_reset,
pix_overflow.eq(0)
)
]
sys_overflow = Signal()
self.specials += MultiReg(pix_overflow, sys_overflow)
self.comb += [
pix_overflow_reset.eq(self._overflow.re),
]
overflow_mask = Signal()
self.comb += [
self._overflow.w.eq(sys_overflow & ~overflow_mask),
]
self.sync += \
If(self._overflow.re,
overflow_mask.eq(1)
).Elif(pix_overflow_reset,
overflow_mask.eq(0)
)