Add example code for headless SoC
authorJean THOMAS <git0@pub.jeanthomas.me>
Tue, 16 Jun 2020 07:59:11 +0000 (09:59 +0200)
committerJean THOMAS <git0@pub.jeanthomas.me>
Tue, 16 Jun 2020 07:59:11 +0000 (09:59 +0200)
examples/headless-ecpix5.py [new file with mode: 0644]
examples/headless/.gitignore [new file with mode: 0644]
examples/headless/Makefile [new file with mode: 0644]
examples/headless/README.md [new file with mode: 0644]
examples/headless/main.c [new file with mode: 0644]
examples/uartbridge.py [new file with mode: 0644]

diff --git a/examples/headless-ecpix5.py b/examples/headless-ecpix5.py
new file mode 100644 (file)
index 0000000..a7e4867
--- /dev/null
@@ -0,0 +1,232 @@
+# This file is Copyright (c) 2020 LambdaConcept <contact@lambdaconcept.com>
+
+from nmigen import *
+from nmigen.lib.cdc import ResetSynchronizer
+from nmigen_soc import wishbone, memory
+
+from lambdasoc.cpu.minerva import MinervaCPU
+from lambdasoc.periph.intc import GenericInterruptController
+from lambdasoc.periph.serial import AsyncSerialPeripheral
+from lambdasoc.periph.sram import SRAMPeripheral
+from lambdasoc.periph.timer import TimerPeripheral
+from lambdasoc.periph import Peripheral
+from lambdasoc.soc.base import SoC
+
+from gram.core import gramCore
+from gram.phy.ecp5ddrphy import ECP5DDRPHY
+from gram.modules import MT41K256M16
+from gram.frontend.wishbone import gramWishbone
+
+from customecpix5 import ECPIX5Platform
+from uartbridge import UARTBridge
+
+class PLL(Elaboratable):
+    def __init__(self, clkin, clksel=Signal(shape=2, reset=2), clkout1=Signal(), clkout2=Signal(), clkout3=Signal(), clkout4=Signal(), lock=Signal(), CLKI_DIV=1, CLKFB_DIV=1, CLK1_DIV=3, CLK2_DIV=4, CLK3_DIV=5, CLK4_DIV=6):
+        self.clkin = clkin
+        self.clkout1 = clkout1
+        self.clkout2 = clkout2
+        self.clkout3 = clkout3
+        self.clkout4 = clkout4
+        self.clksel = clksel
+        self.lock = lock
+        self.CLKI_DIV = CLKI_DIV
+        self.CLKFB_DIV = CLKFB_DIV
+        self.CLKOP_DIV = CLK1_DIV
+        self.CLKOS_DIV = CLK2_DIV
+        self.CLKOS2_DIV = CLK3_DIV
+        self.CLKOS3_DIV = CLK4_DIV
+        self.ports = [
+            self.clkin,
+            self.clkout1,
+            self.clkout2,
+            self.clkout3,
+            self.clkout4,
+            self.clksel,
+            self.lock,
+        ]
+
+    def elaborate(self, platform):
+        clkfb = Signal()
+        pll = Instance("EHXPLLL",
+                       p_PLLRST_ENA='DISABLED',
+                       p_INTFB_WAKE='DISABLED',
+                       p_STDBY_ENABLE='DISABLED',
+                       p_CLKOP_FPHASE=0,
+                       p_CLKOP_CPHASE=11,
+                       p_OUTDIVIDER_MUXA='DIVA',
+                       p_CLKOP_ENABLE='ENABLED',
+                       p_CLKOP_DIV=self.CLKOP_DIV,
+                       p_CLKOS_DIV=self.CLKOS_DIV,
+                       p_CLKOS2_DIV=self.CLKOS2_DIV,
+                       p_CLKOS3_DIV=self.CLKOS3_DIV,
+                       p_CLKFB_DIV=self.CLKFB_DIV,
+                       p_CLKI_DIV=self.CLKI_DIV,
+                       p_FEEDBK_PATH='CLKOP',
+                       i_CLKI=self.clkin,
+                       i_CLKFB=clkfb,
+                       i_RST=0,
+                       i_STDBY=0,
+                       i_PHASESEL0=0,
+                       i_PHASESEL1=0,
+                       i_PHASEDIR=0,
+                       i_PHASESTEP=0,
+                       i_PLLWAKESYNC=0,
+                       i_ENCLKOP=0,
+                       i_ENCLKOS=0,
+                       i_ENCLKOS2=0,
+                       i_ENCLKOS3=0,
+                       o_CLKOP=self.clkout1,
+                       o_CLKOS=self.clkout2,
+                       o_CLKOS2=self.clkout3,
+                       o_CLKOS3=self.clkout4,
+                       o_LOCK=self.lock,
+                       )
+        m = Module()
+        m.submodules += pll
+        with m.If(self.clksel == 0):
+            m.d.comb += clkfb.eq(self.clkout1)
+        with m.Elif(self.clksel == 1):
+            m.d.comb += clkfb.eq(self.clkout2)
+        with m.Elif(self.clksel == 2):
+            m.d.comb += clkfb.eq(self.clkout3)
+        with m.Else():
+            m.d.comb += clkfb.eq(self.clkout4)
+        return m
+
+
+class ECPIX5CRG(Elaboratable):
+    def __init__(self):
+        self.stop = Signal()
+
+    def elaborate(self, platform):
+        m = Module()
+
+        # Get 100Mhz from oscillator
+        clk100 = platform.request("clk100")
+        cd_rawclk = ClockDomain("rawclk", local=True, reset_less=True)
+        m.d.comb += cd_rawclk.clk.eq(clk100)
+        m.domains += cd_rawclk
+
+        # Reset
+        reset = platform.request("rst")
+        gsr0 = Signal()
+        gsr1 = Signal()
+
+        m.submodules += [
+            Instance("FD1S3AX", p_GSR="DISABLED", i_CK=ClockSignal("rawclk"), i_D=~reset.i, o_Q=gsr0),
+            Instance("FD1S3AX", p_GSR="DISABLED", i_CK=ClockSignal("rawclk"), i_D=gsr0,   o_Q=gsr1),
+            Instance("SGSR", i_CLK=ClockSignal("rawclk"), i_GSR=~gsr1),
+        ]
+
+        # Power-on delay (655us)
+        podcnt = Signal(16, reset=2**16-1)
+        pod_done = Signal()
+        with m.If(podcnt != 0):
+            m.d.rawclk += podcnt.eq(podcnt-1)
+        m.d.comb += pod_done.eq(podcnt == 0)
+
+        # Generating sync2x (200Mhz) and init (25Mhz) from clk100
+        cd_sync2x = ClockDomain("sync2x", local=False)
+        # cd_sync2x_unbuf = ClockDomain("sync2x_unbuf", local=True, reset_less=True)
+        cd_init = ClockDomain("init", local=False)
+        cd_sync = ClockDomain("sync", local=False, reset_less=True)
+        m.submodules.pll = pll = PLL(ClockSignal("rawclk"), CLKI_DIV=1, CLKFB_DIV=2, CLK1_DIV=2, CLK2_DIV=16, CLK3_DIV=4,
+            clkout1=ClockSignal("sync2x"), clkout2=ClockSignal("init"), clkout3=ClockSignal("sync"))
+        # m.submodules += Instance("ECLKSYNCB",
+        #         i_ECLKI = ClockSignal("sync2x_unbuf"),
+        #         i_STOP  = self.stop,
+        #         o_ECLKO = ClockSignal("sync2x"))
+        # m.domains += cd_sync2x_unbuf
+        m.domains += cd_sync2x
+        m.domains += cd_init
+        #m.submodules += ResetSynchronizer(~pll.lock|~pod_done, domain="init")
+
+        # Generating sync (100Mhz) from sync2x
+        
+        # m.submodules += Instance("CLKDIVF",
+        #     p_DIV="2.0",
+        #     i_ALIGNWD=0,
+        #     i_CLKI=ClockSignal("sync2x"),
+        #     i_RST=0,
+        #     o_CDIVX=ClockSignal("sync"))
+        m.domains += cd_sync
+
+        return m
+
+class OldCRG(Elaboratable):
+    def elaborate(self, platform):
+        m = Module()
+
+        m.submodules.pll = pll = PLL(ClockSignal(
+            "sync"), CLKI_DIV=1, CLKFB_DIV=2, CLK1_DIV=2, CLK2_DIV=16)
+        cd_sync2x = ClockDomain("sync2x", local=False)
+        m.d.comb += cd_sync2x.clk.eq(pll.clkout1)
+        m.domains += cd_sync2x
+
+        cd_init = ClockDomain("init", local=False)
+        m.d.comb += cd_init.clk.eq(pll.clkout2)
+        m.domains += cd_init
+
+        return m
+
+
+class DDR3SoC(SoC, Elaboratable):
+    def __init__(self, *, clk_freq,
+                 ddrphy_addr, dramcore_addr,
+                 ddr_addr):
+        self._arbiter = wishbone.Arbiter(addr_width=30, data_width=32, granularity=8,
+                                         features={"cti", "bte"})
+        self._decoder = wishbone.Decoder(addr_width=30, data_width=32, granularity=8,
+                                         features={"cti", "bte"})
+
+        self.ub = UARTBridge(divisor=868, pins=platform.request("uart", 0))
+        self._arbiter.add(self.ub.bus)
+
+        self.ddrphy = ECP5DDRPHY(platform.request("ddr3", 0))
+        self._decoder.add(self.ddrphy.bus, addr=ddrphy_addr)
+
+        ddrmodule = MT41K256M16(clk_freq, "1:4")
+
+        self.dramcore = gramCore(
+            phy=self.ddrphy,
+            geom_settings=ddrmodule.geom_settings,
+            timing_settings=ddrmodule.timing_settings,
+            clk_freq=clk_freq)
+        self._decoder.add(self.dramcore.bus, addr=dramcore_addr)
+
+        self.drambone = gramWishbone(self.dramcore)
+        self._decoder.add(self.drambone.bus, addr=ddr_addr)
+
+        self.memory_map = self._decoder.bus.memory_map
+
+        self.clk_freq = clk_freq
+
+    def elaborate(self, platform):
+        m = Module()
+
+        m.submodules.sysclk = OldCRG()
+
+        m.submodules.arbiter = self._arbiter
+        m.submodules.ub = self.ub
+
+        m.submodules.decoder = self._decoder
+        m.submodules.ddrphy = self.ddrphy
+        m.submodules.dramcore = self.dramcore
+        m.submodules.drambone = self.drambone
+
+        m.d.comb += [
+            self._arbiter.bus.connect(self._decoder.bus),
+        ]
+
+        return m
+
+
+if __name__ == "__main__":
+    platform = ECPIX5Platform()
+
+    soc = DDR3SoC(clk_freq=int(platform.default_clk_frequency),
+        ddrphy_addr=0x00008000, dramcore_addr=0x00009000,
+        ddr_addr=0x10000000)
+
+    soc.build(do_build=True)
+    platform.build(soc, do_program=True)
diff --git a/examples/headless/.gitignore b/examples/headless/.gitignore
new file mode 100644 (file)
index 0000000..2557d7c
--- /dev/null
@@ -0,0 +1,2 @@
+*.o
+headless
diff --git a/examples/headless/Makefile b/examples/headless/Makefile
new file mode 100644 (file)
index 0000000..66b3e18
--- /dev/null
@@ -0,0 +1,13 @@
+OBJS := main.o
+
+CFLAGS += -g -I../../libgram/include/
+
+%.o: %.c
+       $(CC) $(CFLAGS) -c $< -o $@
+
+headless: $(OBJS) ../../libgram/libgram.a
+       $(CC) $(LDFLAGS) $(OBJS) ../../libgram/libgram.a -o $@
+
+clean:
+       rm -f $(OBJS)
+       rm -f headless
diff --git a/examples/headless/README.md b/examples/headless/README.md
new file mode 100644 (file)
index 0000000..4b7ef4c
--- /dev/null
@@ -0,0 +1 @@
+# Test application for libgram controlling a headless SoC
diff --git a/examples/headless/main.c b/examples/headless/main.c
new file mode 100644 (file)
index 0000000..ee73eb8
--- /dev/null
@@ -0,0 +1,127 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <fcntl.h>
+#include <errno.h>
+#include <termios.h>
+#include <unistd.h>
+
+#include <netinet/in.h>
+
+#include <gram.h>
+
+uint32_t gram_read(struct gramCtx *ctx, void *addr) {
+       uint8_t commands[6] = { 0x02, 0x01 };
+       uint32_t reply;
+       int received, sent, fd;
+
+       fd = *(int*)(ctx->user_data);
+
+       *(uint32_t*)(&commands[2]) = htonl((uint32_t)addr >> 2);
+
+       sent = write(fd, commands, sizeof(commands));
+       if (sent != sizeof(commands)) {
+               fprintf(stderr, "gram_read error (sent bytes length mismatch)\n");
+       }
+       received = read(fd, &reply, sizeof(reply));
+       if (received != sizeof(reply)) {
+               fprintf(stderr, "gram_read error (read bytes length mismatch: %d != %d)\n", received, sizeof(reply));
+       }
+
+       return ntohl(reply);
+}
+
+int gram_write(struct gramCtx *ctx, void *addr, uint32_t value) {
+       uint8_t commands[10] = { 0x01, 0x01 };
+       int sent;
+
+       *(uint32_t*)(commands+2) = htonl((uint32_t)addr >> 2);
+       *(uint32_t*)(commands+6) = htonl(value);
+
+       sent = write(*(int*)(ctx->user_data), commands, sizeof(commands));
+       if (sent != sizeof(commands)) {
+               fprintf(stderr, "gram_write error (sent bytes length mismatch)\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+int serial_setup(const char *devname, int baudrate) {
+       struct termios tty;
+       int serialfd;
+       
+       serialfd = open("/dev/ttyUSB1", O_RDWR|O_NOCTTY);
+       if (serialfd < 0) {
+               fprintf(stderr, "Error %i from open: %s\n", errno, strerror(errno));
+       }
+
+       memset(&tty, 0, sizeof(tty));
+       if (tcgetattr(serialfd, &tty) != 0) {
+       fprintf(stderr, "Error %i from tcgetattr: %s\n", errno, strerror(errno));
+       }
+
+       /* Parameters from flterm */
+       tcgetattr(serialfd, &tty);
+       tty.c_cflag = B115200;
+       tty.c_cflag |= CS8;
+       tty.c_cflag |= CREAD;
+       tty.c_iflag = IGNPAR | IGNBRK;
+       tty.c_cflag |= CLOCAL;
+       tty.c_oflag = 0;
+       tty.c_lflag = 0;
+       tty.c_cc[VTIME] = 0;
+       tty.c_cc[VMIN] = 1;
+       tcsetattr(serialfd, TCSANOW, &tty);
+       tcflush(serialfd, TCOFLUSH);
+       tcflush(serialfd, TCIFLUSH);
+
+       cfsetispeed(&tty, B115200);
+       cfsetospeed(&tty, B115200);
+
+       cfmakeraw(&tty);
+
+       tcflush(serialfd, TCIFLUSH );
+       if (tcsetattr(serialfd, TCSANOW, &tty) != 0) {
+       fprintf(stderr, "Error %i from tcsetattr: %s\n", errno, strerror(errno));
+       }
+
+       return serialfd;
+}
+
+int main(int argc, char *argv[]) {
+       struct gramCtx ctx;
+       int serial_port, baudrate = 0;
+
+       if (argc != 3) {
+               fprintf(stderr, "Usage: %s port baudrate\n", argv[0]);
+               return EXIT_FAILURE;
+       }
+
+       sscanf(argv[2], "%d", &baudrate);
+       if (baudrate <= 0) {
+               fprintf(stderr, "%d is not a valid baudrate\n", baudrate);
+       }
+
+       printf("Port: %s, baudrate: %d\n", argv[1], baudrate);
+
+       serial_port = serial_setup(argv[1], baudrate);
+       ctx.user_data = &serial_port;
+
+       printf("gram init... ");
+       gram_init(&ctx, (void*)0x10000000, (void*)0x00009000, (void*)0x00008000);
+       printf("done\n");
+
+       printf("memtest... \n");
+       uint32_t *ddr = 0x10000000;
+       for (size_t i = 0; i < 1000; i++) {
+               gram_write(&ctx, &(ddr[i]), 0x12345678);
+               printf("%p = %08x\n", &(ddr[i]), gram_read(&ctx, &(ddr[i])));
+       }
+
+       close(serial_port);
+
+       return EXIT_SUCCESS;
+}
diff --git a/examples/uartbridge.py b/examples/uartbridge.py
new file mode 100644 (file)
index 0000000..9e2cfbb
--- /dev/null
@@ -0,0 +1,306 @@
+from nmigen import *
+from nmigen.lib.io import pin_layout
+from nmigen_soc import wishbone
+from nmigen_stdio.serial import AsyncSerial, AsyncSerialTX
+from nmigen.back.pysim import *
+
+from lambdasoc.periph import Peripheral
+
+import unittest
+
+__ALL__ = ["UARTBridge"]
+
+class UARTBridge(Elaboratable):
+    def __init__(self, divisor, pins):
+        self.bus = wishbone.Interface(addr_width=30,
+                                      data_width=32, granularity=32)
+        self._pins = pins
+        self._divisor = divisor
+
+    def elaborate(self, platform):
+        m = Module()
+
+        m.submodules.serial = serial = AsyncSerial(divisor=self._divisor, pins=self._pins)
+
+        address_width = 32
+        data_width = 32
+
+        cmd = Signal(8)
+        length = Signal(8)
+        address = Signal(address_width)
+        data = Signal(data_width)
+        bytes_count = Signal(range(data_width//8))
+        words_count = Signal(8)
+
+        m.d.comb += [
+            self.bus.dat_w.eq(data),
+            self.bus.adr.eq(address),
+        ]
+
+        with m.FSM():
+            with m.State("Receive-Cmd"):
+                m.d.comb += serial.rx.ack.eq(1)
+
+                # Reset registers
+                m.d.sync += [
+                    bytes_count.eq(data_width//8-1),
+                    words_count.eq(0),
+                ]
+
+                with m.If(serial.rx.rdy):
+                    m.d.sync += cmd.eq(serial.rx.data)
+                    m.next = "Receive-Length"
+
+            with m.State("Receive-Length"):
+                m.d.comb += serial.rx.ack.eq(1)
+
+                with m.If(serial.rx.rdy):
+                    m.d.sync += length.eq(serial.rx.data)
+                    m.next = "Receive-Address"
+
+            with m.State("Receive-Address"):
+                m.d.comb += serial.rx.ack.eq(1)
+
+                with m.If(serial.rx.rdy):
+                    m.d.sync += [
+                        address.eq(Cat(serial.rx.data, address)),
+                        bytes_count.eq(bytes_count-1),
+                    ]
+
+                    with m.If(bytes_count == 0):
+                        with m.Switch(cmd):
+                            with m.Case(0x01):
+                                m.next = "Handle-Write"
+                            with m.Case(0x02):
+                                m.next = "Handle-Read"
+                            with m.Case():
+                                m.next = "Receive-Cmd"
+
+            with m.State("Handle-Write"):
+                m.d.comb += serial.rx.ack.eq(1)
+
+                with m.If(serial.rx.rdy):
+                    m.d.sync += [
+                        data.eq(Cat(serial.rx.data, data)),
+                        bytes_count.eq(bytes_count-1),
+                    ]
+                    with m.If(bytes_count == 0):
+                        m.next = "Write-Data"
+
+            with m.State("Write-Data"):
+                m.d.comb += [
+                    self.bus.stb.eq(1),
+                    self.bus.we.eq(1),
+                    self.bus.cyc.eq(1),
+                    self.bus.sel.eq(1),
+                ]
+
+                with m.If(self.bus.ack):
+                    m.next = "Receive-Cmd"
+
+
+            with m.State("Handle-Read"):
+                m.d.comb += [
+                    self.bus.stb.eq(1),
+                    self.bus.we.eq(0),
+                    self.bus.cyc.eq(1),
+                ]
+
+                with m.If(self.bus.ack):
+                    m.d.sync += [
+                        bytes_count.eq(data_width//8-1),
+                        data.eq(self.bus.dat_r),
+                    ]
+                    m.next = "Send-Data"
+
+            with m.State("Send-Data"):
+                m.d.comb += serial.tx.ack.eq(1)
+
+                with m.Switch(bytes_count):
+                    for i in range(data_width//8):
+                        with m.Case(i):
+                            m.d.comb += serial.tx.data.eq(data[i*8:(i+1)*8])
+
+                with m.If(serial.tx.rdy):
+                    m.next = "Send-Data-Wait"
+
+            with m.State("Send-Data-Wait"):
+                with m.If(serial.tx.rdy):
+                    m.d.sync += [
+                        bytes_count.eq(bytes_count-1),
+                    ]
+
+                    with m.If(bytes_count == 0):
+                        m.next = "Receive-Cmd"
+                    with m.Else():
+                        m.next = "Send-Data"
+
+        return m
+
+def serial_write(serial, val):
+    while not (yield serial.tx.rdy):
+        yield
+
+    yield serial.tx.data.eq(val)
+    yield serial.tx.ack.eq(1)
+    yield
+
+    while (yield serial.tx.rdy):
+        yield
+
+    yield serial.tx.ack.eq(0)
+
+    while not (yield serial.tx.rdy):
+        yield
+
+    yield
+
+def serial_read(serial):
+    yield serial.rx.ack.eq(1)
+
+    while not (yield serial.rx.rdy):
+        yield
+
+    data = (yield serial.rx.data)
+    yield serial.rx.ack.eq(0)
+
+    while (yield serial.rx.rdy):
+        yield
+
+    return data
+
+class UARTBridgeTestCase(unittest.TestCase):
+    # Minimum 5, lowest makes the simulation faster
+    divisor = 5
+    timeout = 10000
+
+    def test_read(self):
+        pins = Record([("rx", pin_layout(1, dir="i")),
+                       ("tx", pin_layout(1, dir="o"))])
+        dut = UARTBridge(divisor=self.divisor, pins=pins)
+        serial = AsyncSerial(divisor=self.divisor)
+        m = Module()
+        m.submodules.bridge = dut
+        m.submodules.serial = serial
+        m.d.comb += [
+            pins.rx.i.eq(serial.tx.o),
+            serial.rx.i.eq(pins.tx.o),
+        ]
+
+        def process():
+            # Send read command
+            yield from serial_write(serial, 0x02)
+            yield
+
+            # Length = 1
+            yield from serial_write(serial, 0x01)
+            yield
+
+            # Send 0x4000 as address
+            yield from serial_write(serial, 0x00)
+            yield
+            yield from serial_write(serial, 0x00)
+            yield
+            yield from serial_write(serial, 0x40)
+            yield
+            yield from serial_write(serial, 0x00)
+            yield
+            
+            # Handle wishbone request
+            timeout = 0
+            while not (yield dut.bus.cyc):
+                yield
+                timeout += 1
+                if timeout > self.timeout:
+                    raise RuntimeError("Simulation timed out")
+
+            # Ensure Wishbone address is the one we asked for
+            self.assertEqual((yield dut.bus.adr), 0x00004000)
+            self.assertFalse((yield dut.bus.we))
+            
+            # Answer
+            yield dut.bus.dat_r.eq(0x0DEFACED)
+            yield dut.bus.ack.eq(1)
+            yield
+
+            # Check response on UART
+            rx = yield from serial_read(serial)
+            self.assertEqual(rx, 0x0D)
+            rx = yield from serial_read(serial)
+            self.assertEqual(rx, 0xEF)
+            rx = yield from serial_read(serial)
+            self.assertEqual(rx, 0xAC)
+            rx = yield from serial_read(serial)
+            self.assertEqual(rx, 0xED)            
+
+            yield
+
+        sim = Simulator(m)
+        with sim.write_vcd("test_uartbridge.vcd"):
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+            sim.run()
+
+    def test_write(self):
+        pins = Record([("rx", pin_layout(1, dir="i")),
+                       ("tx", pin_layout(1, dir="o"))])
+        dut = UARTBridge(divisor=self.divisor, pins=pins)
+        serial = AsyncSerial(divisor=self.divisor)
+        m = Module()
+        m.submodules.bridge = dut
+        m.submodules.serial = serial
+        m.d.comb += [
+            pins.rx.i.eq(serial.tx.o),
+            serial.rx.i.eq(pins.tx.o),
+        ]
+
+        def process():
+            # Send write command
+            yield from serial_write(serial, 0x01)
+            yield
+
+            # Length = 1
+            yield from serial_write(serial, 0x01)
+            yield
+
+            # Send 0x4000 as address
+            yield from serial_write(serial, 0x00)
+            yield
+            yield from serial_write(serial, 0x00)
+            yield
+            yield from serial_write(serial, 0x40)
+            yield
+            yield from serial_write(serial, 0x00)
+            yield
+
+            # Send 0xFEEDFACE as value
+            yield from serial_write(serial, 0xFE)
+            yield
+            yield from serial_write(serial, 0xED)
+            yield
+            yield from serial_write(serial, 0xFA)
+            yield
+            yield from serial_write(serial, 0xCE)
+            
+            # Handle wishbone request
+            timeout = 0
+            while not (yield dut.bus.cyc):
+                yield
+                timeout += 1
+                if timeout > self.timeout:
+                    raise RuntimeError("Simulation timed out")
+
+            # Ensure Wishbone address is the one we asked for
+            self.assertEqual((yield dut.bus.adr), 0x00004000)
+            self.assertEqual((yield dut.bus.dat_w), 0xFEEDFACE)
+            self.assertTrue((yield dut.bus.we))
+            
+            # Answer
+            yield dut.bus.ack.eq(1)
+            yield
+
+        sim = Simulator(m)
+        with sim.write_vcd("test_uartbridge.vcd"):
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+            sim.run()