from nmigen.compat.sim import run_simulation
from nmigen.cli import verilog, rtlil
-from multipipe import CombMultiOutPipeline
-
-
-class MuxUnbufferedPipeline(CombMultiOutPipeline):
- def __init__(self, stage, n_len):
- # HACK: stage is also the n-way multiplexer
- CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage)
-
- # HACK: n-mux is also the stage... so set the muxid equal to input mid
- stage.m_id = self.p.i_data.mid
-
- def ports(self):
- return self.p_mux.ports()
+from multipipe import CombMuxOutPipe
+from singlepipe import SimpleHandshake
class PassInData:
return PassInData()
def ospec(self):
- return Signal(16, reset_less=True)
+ return Signal(16, name="data_out", reset_less=True)
def process(self, i):
return i.data
+class PassThroughDataStage:
+ def ispec(self):
+ return PassInData()
+ def ospec(self):
+ return self.ispec() # same as ospec
+
+ def process(self, i):
+ return i # pass-through
+
+
+
+class PassThroughPipe(SimpleHandshake):
+ def __init__(self):
+ SimpleHandshake.__init__(self, PassThroughDataStage())
+
+
+
def testbench(dut):
stb = yield dut.out_op.stb
self.dut = dut
self.di = []
self.do = {}
- self.tlen = 3
+ self.tlen = 10
for i in range(self.tlen * dut.num_rows):
- mid = randint(0, dut.num_rows-1)
+ if i < dut.num_rows:
+ mid = i
+ else:
+ mid = randint(0, dut.num_rows-1)
data = randint(0, 255) + (mid<<8)
if mid not in self.do:
self.do[mid] = []
def rcv(self, mid):
out_i = 0
count = 0
- while out_i != self.tlen:
+ stall_range = randint(0, 3)
+ while out_i != len(self.do[mid]):
count += 1
- if count == 1000:
- break
- #stall_range = randint(0, 3)
- #for j in range(randint(1,10)):
- # stall = randint(0, stall_range) != 0
- # yield self.dut.n[0].i_ready.eq(stall)
- # yield
+ assert count != 2000, "timeout: too long"
n = self.dut.n[mid]
yield n.i_ready.eq(1)
yield
out_i += 1
+ if randint(0, 5) == 0:
+ stall_range = randint(0, 3)
+ stall = randint(0, stall_range) != 0
+ if stall:
+ yield n.i_ready.eq(0)
+ for i in range(stall_range):
+ yield
+
-class TestPriorityMuxPipe(MuxUnbufferedPipeline):
+class TestPriorityMuxPipe(CombMuxOutPipe):
+ def __init__(self, num_rows):
+ self.num_rows = num_rows
+ stage = PassThroughStage()
+ CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
+
+
+class TestSyncToPriorityPipe:
def __init__(self):
self.num_rows = 4
- stage = PassThroughStage()
- MuxUnbufferedPipeline.__init__(self, stage, n_len=self.num_rows)
+ self.pipe = PassThroughPipe()
+ self.muxpipe = TestPriorityMuxPipe(self.num_rows)
+
+ self.p = self.pipe.p
+ self.n = self.muxpipe.n
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules += self.pipe
+ m.submodules += self.muxpipe
+ m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
+ return m
def ports(self):
res = [self.p.i_valid, self.p.o_ready] + \
if __name__ == '__main__':
- dut = TestPriorityMuxPipe()
+ dut = TestSyncToPriorityPipe()
vl = rtlil.convert(dut, ports=dut.ports())
with open("test_outmux_pipe.il", "w") as f:
f.write(vl)