X-Git-Url: https://git.libre-soc.org/?a=blobdiff_plain;f=src%2Fadd%2Ftest_inout_mux_pipe.py;h=35abe2eaf46b3aa148826b6821640cf39dfe2786;hb=6bff1a997f3846872cf489c24b5c01426c4dc97c;hp=57b5676a018ac6835e763800da9ba0ec7b77d375;hpb=adbbd779461acb1ac334be219958fbb457318899;p=ieee754fpu.git diff --git a/src/add/test_inout_mux_pipe.py b/src/add/test_inout_mux_pipe.py index 57b5676a..35abe2ea 100644 --- a/src/add/test_inout_mux_pipe.py +++ b/src/add/test_inout_mux_pipe.py @@ -1,4 +1,4 @@ -""" key strategic example showing how to do multi-input fan-in into a +""" key strategic example showing how to do multi-input fan-in into a multi-stage pipeline, then multi-output fanout. the multiplex ID from the fan-in is passed in to the pipeline, preserved, @@ -7,58 +7,30 @@ from random import randint from math import log -from nmigen import Module, Signal, Cat, Value +from nmigen import Module, Signal, Cat, Value, Elaboratable from nmigen.compat.sim import run_simulation from nmigen.cli import verilog, rtlil -from multipipe import CombMultiOutPipeline -from multipipe import CombMultiInPipeline, InputPriorityArbiter -from singlepipe import UnbufferedPipeline +from multipipe import CombMultiOutPipeline, CombMuxOutPipe +from multipipe import PriorityCombMuxInPipe +from singlepipe import SimpleHandshake, RecordObject, Object -class PriorityUnbufferedPipeline(CombMultiInPipeline): - def __init__(self, stage, p_len): - p_mux = InputPriorityArbiter(self, p_len) - CombMultiInPipeline.__init__(self, stage, p_len=p_len, p_mux=p_mux) - - def ports(self): - return self.p_mux.ports() - #return UnbufferedPipeline.ports(self) + self.p_mux.ports() - - -class MuxUnbufferedPipeline(CombMultiOutPipeline): - def __init__(self, stage, n_len): - # HACK: stage is also the n-way multiplexer - CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage) - - # HACK: n-mux is also the stage... so set the muxid equal to input mid - stage.m_id = self.p.i_data.mid - - def ports(self): - return self.p_mux.ports() - - -class PassData: # (Value): +class PassData2(RecordObject): def __init__(self): + RecordObject.__init__(self) self.mid = Signal(2, reset_less=True) self.idx = Signal(8, reset_less=True) self.data = Signal(16, reset_less=True) - def _rhs_signals(self): - return self.ports() - def shape(self): - bits, sign = 0, False - for elem_bits, elem_sign in (elem.shape() for elem in self.ports()): - bits = max(bits, elem_bits + elem_sign) - sign = max(sign, elem_sign) - return bits, sign - - def eq(self, i): - return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)] +class PassData(Object): + def __init__(self): + Object.__init__(self) + self.mid = Signal(2, reset_less=True) + self.idx = Signal(8, reset_less=True) + self.data = Signal(16, reset_less=True) - def ports(self): - return [self.mid, self.idx, self.data] class PassThroughStage: @@ -66,15 +38,15 @@ class PassThroughStage: return PassData() def ospec(self): return self.ispec() # same as ospec - + def process(self, i): return i # pass-through -class PassThroughPipe(UnbufferedPipeline): +class PassThroughPipe(SimpleHandshake): def __init__(self): - UnbufferedPipeline.__init__(self, PassThroughStage()) + SimpleHandshake.__init__(self, PassThroughStage()) class InputTest: @@ -94,24 +66,24 @@ class InputTest: for i in range(self.tlen): op2 = self.di[mid][i] rs = dut.p[mid] - yield rs.i_valid.eq(1) - yield rs.i_data.data.eq(op2) - yield rs.i_data.idx.eq(i) - yield rs.i_data.mid.eq(mid) + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.idx.eq(i) + yield rs.data_i.mid.eq(mid) yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o while not o_p_ready: yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o print ("send", mid, i, hex(op2)) - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) # wait random period of time before queueing another value for i in range(randint(0, 3)): yield - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) yield print ("send ended", mid) @@ -131,19 +103,19 @@ class InputTest: #stall_range = randint(0, 3) #for j in range(randint(1,10)): # stall = randint(0, stall_range) != 0 - # yield self.dut.n[0].i_ready.eq(stall) + # yield self.dut.n[0].ready_i.eq(stall) # yield n = self.dut.n[mid] - yield n.i_ready.eq(1) + yield n.ready_i.eq(1) yield - o_n_valid = yield n.o_valid - i_n_ready = yield n.i_ready + o_n_valid = yield n.valid_o + i_n_ready = yield n.ready_i if not o_n_valid or not i_n_ready: continue - out_mid = yield n.o_data.mid - out_i = yield n.o_data.idx - out_v = yield n.o_data.data + out_mid = yield n.data_o.mid + out_i = yield n.data_o.idx + out_v = yield n.data_o.data print ("recv", out_mid, out_i, hex(out_v)) @@ -160,21 +132,11 @@ class InputTest: print ("recv ended", mid) -class TestPriorityMuxPipe(PriorityUnbufferedPipeline): +class TestPriorityMuxPipe(PriorityCombMuxInPipe): def __init__(self, num_rows): self.num_rows = num_rows stage = PassThroughStage() - PriorityUnbufferedPipeline.__init__(self, stage, p_len=self.num_rows) - - def ports(self): - res = [] - for i in range(len(self.p)): - res += [self.p[i].i_valid, self.p[i].o_ready] + \ - self.p[i].i_data.ports() - res += [self.n.i_ready, self.n.o_valid] + \ - self.n.o_data.ports() - return res - + PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows) class OutputTest: @@ -195,41 +157,33 @@ class OutputTest: op2 = self.di[i][0] mid = self.di[i][1] rs = dut.p - yield rs.i_valid.eq(1) - yield rs.i_data.data.eq(op2) - yield rs.i_data.mid.eq(mid) + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.mid.eq(mid) yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o while not o_p_ready: yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o print ("send", mid, i, hex(op2)) - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) # wait random period of time before queueing another value for i in range(randint(0, 3)): yield - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) -class TestMuxOutPipe(MuxUnbufferedPipeline): +class TestMuxOutPipe(CombMuxOutPipe): def __init__(self, num_rows): self.num_rows = num_rows stage = PassThroughStage() - MuxUnbufferedPipeline.__init__(self, stage, n_len=self.num_rows) - - def ports(self): - res = [self.p.i_valid, self.p.o_ready] + \ - self.p.i_data.ports() - for i in range(len(self.n)): - res += [self.n[i].i_ready, self.n[i].o_valid] + \ - self.n[i].o_data.ports() - return res + CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) -class TestInOutPipe: +class TestInOutPipe(Elaboratable): def __init__(self, num_rows=4): self.num_rows = num_rows self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)