X-Git-Url: https://git.libre-soc.org/?a=blobdiff_plain;f=src%2Fadd%2Ftest_outmux_pipe.py;h=b674a87069f82394ad771a5f658d8ee0838721e5;hb=6bff1a997f3846872cf489c24b5c01426c4dc97c;hp=837a1eb25683d8026a010b3a31f3cb5a2f296243;hpb=ccd1388e07261d040ede5f0bf347313fd8a29cd1;p=ieee754fpu.git diff --git a/src/add/test_outmux_pipe.py b/src/add/test_outmux_pipe.py index 837a1eb2..b674a870 100644 --- a/src/add/test_outmux_pipe.py +++ b/src/add/test_outmux_pipe.py @@ -1,140 +1,46 @@ from random import randint from math import log -from nmigen import Module, Signal, Cat +from nmigen import Module, Signal, Cat, Elaboratable from nmigen.compat.sim import run_simulation from nmigen.cli import verilog, rtlil -from multipipe import CombMultiOutPipeline +from multipipe import CombMuxOutPipe +from singlepipe import SimpleHandshake, PassThroughHandshake, RecordObject -class MuxUnbufferedPipeline(CombMultiOutPipeline): - def __init__(self, stage, n_len): - # HACK: stage is also the n-way multiplexer - CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage) - - # HACK: n-mux is also the stage... so set the muxid equal to input mid - stage.m_id = self.p.i_data.mid - - def ports(self): - return self.p_mux.ports() - - -class PassInData: +class PassInData(RecordObject): def __init__(self): + RecordObject.__init__(self) self.mid = Signal(2, reset_less=True) self.data = Signal(16, reset_less=True) - def eq(self, i): - return [self.mid.eq(i.mid), self.data.eq(i.data)] - - def ports(self): - return [self.mid, self.data] - class PassThroughStage: def ispec(self): return PassInData() - def ospec(self): - return Signal(16, reset_less=True) + def ospec(self, name): + return Signal(16, name="%s_dout" % name, reset_less=True) def process(self, i): return i.data +class PassThroughDataStage: + def ispec(self): + return PassInData() + def ospec(self): + return self.ispec() # same as ospec + + def process(self, i): + return i # pass-through + -def testbench(dut): - stb = yield dut.out_op.stb - assert stb == 0 - ack = yield dut.out_op.ack - assert ack == 0 - - # set row 1 input 0 - yield dut.rs[1].in_op[0].eq(5) - yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready - #yield dut.rs[1].ack.eq(1) - yield - - # check row 1 output (should be inactive) - decode = yield dut.rs[1].out_decode - assert decode == 0 - if False: - op0 = yield dut.rs[1].out_op[0] - op1 = yield dut.rs[1].out_op[1] - assert op0 == 0 and op1 == 0 - - # output should be inactive - out_stb = yield dut.out_op.stb - assert out_stb == 1 - - # set row 0 input 1 - yield dut.rs[1].in_op[1].eq(6) - yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready - - # set acknowledgement of output... takes 1 cycle to respond - yield dut.out_op.ack.eq(1) - yield - yield dut.out_op.ack.eq(0) # clear ack on output - yield dut.rs[1].stb.eq(0) # clear row 1 strobe - - # output strobe should be active, MID should be 0 until "ack" is set... - out_stb = yield dut.out_op.stb - assert out_stb == 1 - out_mid = yield dut.mid - assert out_mid == 0 - - # ... and output should not yet be passed through either - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 0 and op1 == 0 - - # wait for out_op.ack to activate... - yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero - yield - - # *now* output should be passed through - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 5 and op1 == 6 - - # set row 2 input - yield dut.rs[2].in_op[0].eq(3) - yield dut.rs[2].in_op[1].eq(4) - yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready - yield dut.out_op.ack.eq(1) # set output ack - yield - yield dut.rs[2].stb.eq(0) # clear row 2 strobe - yield dut.out_op.ack.eq(0) # set output ack - yield - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1) - out_mid = yield dut.mid - assert out_mid == 2 - - # set row 0 and 3 input - yield dut.rs[0].in_op[0].eq(9) - yield dut.rs[0].in_op[1].eq(8) - yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready - yield dut.rs[3].in_op[0].eq(1) - yield dut.rs[3].in_op[1].eq(2) - yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready - - # set acknowledgement of output... takes 1 cycle to respond - yield dut.out_op.ack.eq(1) - yield - yield dut.rs[0].stb.eq(0) # clear row 1 strobe - yield - out_mid = yield dut.mid - assert out_mid == 0, "out mid %d" % out_mid - - yield - yield dut.rs[3].stb.eq(0) # clear row 1 strobe - yield dut.out_op.ack.eq(0) # clear ack on output - yield - out_mid = yield dut.mid - assert out_mid == 3, "out mid %d" % out_mid + +class PassThroughPipe(PassThroughHandshake): + def __init__(self): + PassThroughHandshake.__init__(self, PassThroughDataStage()) class OutputTest: @@ -159,23 +65,23 @@ class OutputTest: op2 = self.di[i][0] mid = self.di[i][1] rs = dut.p - yield rs.i_valid.eq(1) - yield rs.i_data.data.eq(op2) - yield rs.i_data.mid.eq(mid) + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.mid.eq(mid) yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o while not o_p_ready: yield - o_p_ready = yield rs.o_ready + o_p_ready = yield rs.ready_o print ("send", mid, i, hex(op2)) - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) # wait random period of time before queueing another value for i in range(randint(0, 3)): yield - yield rs.i_valid.eq(0) + yield rs.valid_i.eq(0) def rcv(self, mid): out_i = 0 @@ -185,14 +91,14 @@ class OutputTest: count += 1 assert count != 2000, "timeout: too long" n = self.dut.n[mid] - yield n.i_ready.eq(1) + yield n.ready_i.eq(1) yield - o_n_valid = yield n.o_valid - i_n_ready = yield n.i_ready + o_n_valid = yield n.valid_o + i_n_ready = yield n.ready_i if not o_n_valid or not i_n_ready: continue - out_v = yield n.o_data + out_v = yield n.data_o print ("recv", mid, out_i, hex(out_v)) @@ -204,33 +110,49 @@ class OutputTest: stall_range = randint(0, 3) stall = randint(0, stall_range) != 0 if stall: - yield n.i_ready.eq(0) + yield n.ready_i.eq(0) for i in range(stall_range): yield -class TestPriorityMuxPipe(MuxUnbufferedPipeline): +class TestPriorityMuxPipe(CombMuxOutPipe): + def __init__(self, num_rows): + self.num_rows = num_rows + stage = PassThroughStage() + CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) + + +class TestSyncToPriorityPipe(Elaboratable): def __init__(self): self.num_rows = 4 - stage = PassThroughStage() - MuxUnbufferedPipeline.__init__(self, stage, n_len=self.num_rows) + self.pipe = PassThroughPipe() + self.muxpipe = TestPriorityMuxPipe(self.num_rows) + + self.p = self.pipe.p + self.n = self.muxpipe.n + + def elaborate(self, platform): + m = Module() + m.submodules.pipe = self.pipe + m.submodules.muxpipe = self.muxpipe + m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p) + return m def ports(self): - res = [self.p.i_valid, self.p.o_ready] + \ - self.p.i_data.ports() + res = [self.p.valid_i, self.p.ready_o] + \ + self.p.data_i.ports() for i in range(len(self.n)): - res += [self.n[i].i_ready, self.n[i].o_valid] + \ - [self.n[i].o_data] - #self.n[i].o_data.ports() + res += [self.n[i].ready_i, self.n[i].valid_o] + \ + [self.n[i].data_o] + #self.n[i].data_o.ports() return res if __name__ == '__main__': - dut = TestPriorityMuxPipe() + dut = TestSyncToPriorityPipe() vl = rtlil.convert(dut, ports=dut.ports()) with open("test_outmux_pipe.il", "w") as f: f.write(vl) - #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd") test = OutputTest(dut) run_simulation(dut, [test.rcv(1), test.rcv(0),