from random import randint
from math import log
-from nmigen import Module, Signal, Cat, Value
+from nmigen import Module, Signal, Cat, Value, Elaboratable
from nmigen.compat.sim import run_simulation
from nmigen.cli import verilog, rtlil
-from multipipe import CombMultiOutPipeline
+from multipipe import CombMultiOutPipeline, CombMuxOutPipe
from multipipe import PriorityCombMuxInPipe
-from singlepipe import UnbufferedPipeline
+from singlepipe import SimpleHandshake, RecordObject, Object
-class MuxCombPipeline(CombMultiOutPipeline):
- def __init__(self, stage, n_len):
- # HACK: stage is also the n-way multiplexer
- CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage)
-
- # HACK: n-mux is also the stage... so set the muxid equal to input mid
- stage.m_id = self.p.i_data.mid
-
- def ports(self):
- return self.p_mux.ports()
-
-
-class PassData: # (Value):
+class PassData2(RecordObject):
def __init__(self):
+ RecordObject.__init__(self)
self.mid = Signal(2, reset_less=True)
self.idx = Signal(8, reset_less=True)
self.data = Signal(16, reset_less=True)
- def _rhs_signals(self):
- return self.ports()
-
- def shape(self):
- bits, sign = 0, False
- for elem_bits, elem_sign in (elem.shape() for elem in self.ports()):
- bits = max(bits, elem_bits + elem_sign)
- sign = max(sign, elem_sign)
- return bits, sign
- def eq(self, i):
- return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
+class PassData(Object):
+ def __init__(self):
+ Object.__init__(self)
+ self.mid = Signal(2, reset_less=True)
+ self.idx = Signal(8, reset_less=True)
+ self.data = Signal(16, reset_less=True)
- def ports(self):
- return [self.mid, self.idx, self.data]
class PassThroughStage:
-class PassThroughPipe(UnbufferedPipeline):
+class PassThroughPipe(SimpleHandshake):
def __init__(self):
- UnbufferedPipeline.__init__(self, PassThroughStage())
+ SimpleHandshake.__init__(self, PassThroughStage())
class InputTest:
for i in range(self.tlen):
op2 = self.di[mid][i]
rs = dut.p[mid]
- yield rs.i_valid.eq(1)
- yield rs.i_data.data.eq(op2)
- yield rs.i_data.idx.eq(i)
- yield rs.i_data.mid.eq(mid)
+ yield rs.valid_i.eq(1)
+ yield rs.data_i.data.eq(op2)
+ yield rs.data_i.idx.eq(i)
+ yield rs.data_i.mid.eq(mid)
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
while not o_p_ready:
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
print ("send", mid, i, hex(op2))
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
# wait random period of time before queueing another value
for i in range(randint(0, 3)):
yield
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
yield
print ("send ended", mid)
#stall_range = randint(0, 3)
#for j in range(randint(1,10)):
# stall = randint(0, stall_range) != 0
- # yield self.dut.n[0].i_ready.eq(stall)
+ # yield self.dut.n[0].ready_i.eq(stall)
# yield
n = self.dut.n[mid]
- yield n.i_ready.eq(1)
+ yield n.ready_i.eq(1)
yield
- o_n_valid = yield n.o_valid
- i_n_ready = yield n.i_ready
+ o_n_valid = yield n.valid_o
+ i_n_ready = yield n.ready_i
if not o_n_valid or not i_n_ready:
continue
- out_mid = yield n.o_data.mid
- out_i = yield n.o_data.idx
- out_v = yield n.o_data.data
+ out_mid = yield n.data_o.mid
+ out_i = yield n.data_o.idx
+ out_v = yield n.data_o.data
print ("recv", out_mid, out_i, hex(out_v))
stage = PassThroughStage()
PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
- def ports(self):
- res = []
- for i in range(len(self.p)):
- res += [self.p[i].i_valid, self.p[i].o_ready] + \
- self.p[i].i_data.ports()
- res += [self.n.i_ready, self.n.o_valid] + \
- self.n.o_data.ports()
- return res
-
-
class OutputTest:
def __init__(self, dut):
op2 = self.di[i][0]
mid = self.di[i][1]
rs = dut.p
- yield rs.i_valid.eq(1)
- yield rs.i_data.data.eq(op2)
- yield rs.i_data.mid.eq(mid)
+ yield rs.valid_i.eq(1)
+ yield rs.data_i.data.eq(op2)
+ yield rs.data_i.mid.eq(mid)
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
while not o_p_ready:
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
print ("send", mid, i, hex(op2))
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
# wait random period of time before queueing another value
for i in range(randint(0, 3)):
yield
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
-class TestMuxOutPipe(MuxCombPipeline):
+class TestMuxOutPipe(CombMuxOutPipe):
def __init__(self, num_rows):
self.num_rows = num_rows
stage = PassThroughStage()
- MuxCombPipeline.__init__(self, stage, n_len=self.num_rows)
-
- def ports(self):
- res = [self.p.i_valid, self.p.o_ready] + \
- self.p.i_data.ports()
- for i in range(len(self.n)):
- res += [self.n[i].i_ready, self.n[i].o_valid] + \
- self.n[i].o_data.ports()
- return res
+ CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
-class TestInOutPipe:
+class TestInOutPipe(Elaboratable):
def __init__(self, num_rows=4):
self.num_rows = num_rows
self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)