b93537d9d2a73bc70d3e4ba8cd47101d03d93df4
1 # nmigen: UnusedElaboratable=no
3 from collections
import OrderedDict
5 from ..hdl
.ast
import *
8 from ..hdl
.mem
import *
12 class BadElaboratable(Elaboratable
):
13 def elaborate(self
, platform
):
17 class FragmentGetTestCase(FHDLTestCase
):
18 def test_get_wrong(self
):
19 with self
.assertRaises(AttributeError,
20 msg
="Object None cannot be elaborated"):
21 Fragment
.get(None, platform
=None)
23 with self
.assertWarns(UserWarning,
24 msg
=".elaborate() returned None; missing return statement?"):
25 with self
.assertRaises(AttributeError,
26 msg
="Object None cannot be elaborated"):
27 Fragment
.get(BadElaboratable(), platform
=None)
30 class FragmentGeneratedTestCase(FHDLTestCase
):
31 def test_find_subfragment(self
):
34 f1
.add_subfragment(f2
, "f2")
36 self
.assertEqual(f1
.find_subfragment(0), f2
)
37 self
.assertEqual(f1
.find_subfragment("f2"), f2
)
39 def test_find_subfragment_wrong(self
):
42 f1
.add_subfragment(f2
, "f2")
44 with self
.assertRaises(NameError,
45 msg
="No subfragment at index #1"):
46 f1
.find_subfragment(1)
47 with self
.assertRaises(NameError,
48 msg
="No subfragment with name 'fx'"):
49 f1
.find_subfragment("fx")
51 def test_find_generated(self
):
54 f2
.generated
["sig"] = sig
= Signal()
55 f1
.add_subfragment(f2
, "f2")
57 self
.assertEqual(SignalKey(f1
.find_generated("f2", "sig")),
61 class FragmentDriversTestCase(FHDLTestCase
):
64 self
.assertEqual(list(f
.iter_comb()), [])
65 self
.assertEqual(list(f
.iter_sync()), [])
68 class FragmentPortsTestCase(FHDLTestCase
):
79 self
.assertEqual(list(f
.iter_ports()), [])
81 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
82 self
.assertEqual(f
.ports
, SignalDict([]))
84 def test_iter_signals(self
):
86 f
.add_ports(self
.s1
, self
.s2
, dir="io")
87 self
.assertEqual(SignalSet((self
.s1
, self
.s2
)), f
.iter_signals())
89 def test_self_contained(self
):
96 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
97 self
.assertEqual(f
.ports
, SignalDict([]))
99 def test_infer_input(self
):
105 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
106 self
.assertEqual(f
.ports
, SignalDict([
110 def test_request_output(self
):
116 f
._propagate
_ports
(ports
=(self
.c1
,), all_undef_as_ports
=True)
117 self
.assertEqual(f
.ports
, SignalDict([
122 def test_input_in_subfragment(self
):
131 f1
.add_subfragment(f2
)
132 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
133 self
.assertEqual(f1
.ports
, SignalDict())
134 self
.assertEqual(f2
.ports
, SignalDict([
138 def test_input_only_in_subfragment(self
):
144 f1
.add_subfragment(f2
)
145 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
146 self
.assertEqual(f1
.ports
, SignalDict([
149 self
.assertEqual(f2
.ports
, SignalDict([
153 def test_output_from_subfragment(self
):
162 f1
.add_subfragment(f2
)
164 f1
._propagate_ports(ports
=(self
.c2
,), all_undef_as_ports
=True)
165 self
.assertEqual(f1
.ports
, SignalDict([
168 self
.assertEqual(f2
.ports
, SignalDict([
172 def test_output_from_subfragment_2(self
):
181 f1
.add_subfragment(f2
)
186 f2
.add_subfragment(f3
)
188 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
189 self
.assertEqual(f2
.ports
, SignalDict([
193 def test_input_output_sibling(self
):
199 f1
.add_subfragment(f2
)
204 f3
.add_driver(self
.c2
)
205 f1
.add_subfragment(f3
)
207 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
208 self
.assertEqual(f1
.ports
, SignalDict())
210 def test_output_input_sibling(self
):
216 f2
.add_driver(self
.c2
)
217 f1
.add_subfragment(f2
)
222 f1
.add_subfragment(f3
)
224 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
225 self
.assertEqual(f1
.ports
, SignalDict())
227 def test_input_cd(self
):
234 f
.add_driver(self
.c1
, "sync")
236 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
237 self
.assertEqual(f
.ports
, SignalDict([
243 def test_input_cd_reset_less(self
):
244 sync
= ClockDomain(reset_less
=True)
250 f
.add_driver(self
.c1
, "sync")
252 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
253 self
.assertEqual(f
.ports
, SignalDict([
258 def test_inout(self
):
261 f2
= Instance("foo", io_x
=s
)
262 f1
.add_subfragment(f2
)
264 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
265 self
.assertEqual(f1
.ports
, SignalDict([
269 def test_in_out_same_signal(self
):
272 f1
= Instance("foo", i_x
=s
, o_y
=s
)
274 f2
.add_subfragment(f1
)
276 f2
._propagate_ports(ports
=(), all_undef_as_ports
=True)
277 self
.assertEqual(f1
.ports
, SignalDict([
281 f3
= Instance("foo", o_y
=s
, i_x
=s
)
283 f4
.add_subfragment(f3
)
285 f4
._propagate_ports(ports
=(), all_undef_as_ports
=True)
286 self
.assertEqual(f3
.ports
, SignalDict([
290 def test_clk_rst(self
):
295 f
= f
.prepare(ports
=(ClockSignal("sync"), ResetSignal("sync")))
296 self
.assertEqual(f
.ports
, SignalDict([
301 def test_port_wrong(self
):
303 with self
.assertRaises(TypeError,
304 msg
="Only signals may be added as ports, not (const 1'd1)"):
305 f
.prepare(ports
=(Const(1),))
307 def test_port_not_iterable(self
):
309 with self
.assertRaises(TypeError,
310 msg
="`ports` must be either a list or a tuple, not 1"):
312 with self
.assertRaises(TypeError,
313 msg
="`ports` must be either a list or a tuple, not (const 1'd1)" +
314 " (did you mean `ports=(<signal>,)`, rather than `ports=<signal>`?)"):
315 f
.prepare(ports
=Const(1))
317 class FragmentDomainsTestCase(FHDLTestCase
):
318 def test_iter_signals(self
):
320 cd2
= ClockDomain(reset_less
=True)
325 f
.add_domains(cd1
, cd2
)
326 f
.add_driver(s1
, "cd1")
327 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, s1
)), f
.iter_signals())
328 f
.add_driver(s2
, "cd2")
329 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, cd2
.clk
, s1
, s2
)), f
.iter_signals())
331 def test_propagate_up(self
):
336 f1
.add_subfragment(f2
)
339 f1
._propagate_domains_up()
340 self
.assertEqual(f1
.domains
, {"cd": cd
})
342 def test_propagate_up_local(self
):
343 cd
= ClockDomain(local
=True)
347 f1
.add_subfragment(f2
)
350 f1
._propagate_domains_up()
351 self
.assertEqual(f1
.domains
, {})
353 def test_domain_conflict(self
):
354 cda
= ClockDomain("sync")
355 cdb
= ClockDomain("sync")
362 f
.add_subfragment(fa
, "a")
363 f
.add_subfragment(fb
, "b")
365 f
._propagate
_domains
_up
()
366 self
.assertEqual(f
.domains
, {"a_sync": cda
, "b_sync": cdb
})
367 (fa
, _
), (fb
, _
) = f
.subfragments
368 self
.assertEqual(fa
.domains
, {"a_sync": cda
})
369 self
.assertEqual(fb
.domains
, {"b_sync": cdb
})
371 def test_domain_conflict_anon(self
):
372 cda
= ClockDomain("sync")
373 cdb
= ClockDomain("sync")
380 f
.add_subfragment(fa
, "a")
381 f
.add_subfragment(fb
)
383 with self
.assertRaises(DomainError
,
384 msg
="Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
385 "'top'; it is necessary to either rename subfragment domains explicitly, "
386 "or give names to subfragments"):
387 f
._propagate
_domains
_up
()
389 def test_domain_conflict_name(self
):
390 cda
= ClockDomain("sync")
391 cdb
= ClockDomain("sync")
398 f
.add_subfragment(fa
, "x")
399 f
.add_subfragment(fb
, "x")
401 with self
.assertRaises(DomainError
,
402 msg
="Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
403 "of which have identical names; it is necessary to either rename subfragment "
404 "domains explicitly, or give distinct names to subfragments"):
405 f
._propagate
_domains
_up
()
407 def test_domain_conflict_rename_drivers(self
):
408 cda
= ClockDomain("sync")
409 cdb
= ClockDomain("sync")
415 fb
.add_driver(ResetSignal("sync"), None)
417 f
.add_subfragment(fa
, "a")
418 f
.add_subfragment(fb
, "b")
420 f
._propagate
_domains
_up
()
421 fb_new
, _
= f
.subfragments
[1]
422 self
.assertEqual(fb_new
.drivers
, OrderedDict({
423 None: SignalSet((ResetSignal("b_sync"),))
426 def test_domain_conflict_rename_drivers(self
):
427 cda
= ClockDomain("sync")
428 cdb
= ClockDomain("sync")
436 f
.add_subfragment(fa
, "a")
437 f
.add_subfragment(fb
, "b")
438 f
.add_driver(s
, "b_sync")
440 f
._propagate
_domains
(lambda name
: ClockDomain(name
))
442 def test_propagate_down(self
):
448 f1
.add_subfragment(f2
)
450 f1
._propagate_domains_down()
451 self
.assertEqual(f2
.domains
, {"cd": cd
})
453 def test_propagate_down_idempotent(self
):
460 f1
.add_subfragment(f2
)
462 f1
._propagate_domains_down()
463 self
.assertEqual(f1
.domains
, {"cd": cd
})
464 self
.assertEqual(f2
.domains
, {"cd": cd
})
466 def test_propagate(self
):
472 f1
.add_subfragment(f2
)
474 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: None)
475 self
.assertEqual(f1
.domains
, {"cd": cd
})
476 self
.assertEqual(f2
.domains
, {"cd": cd
})
477 self
.assertEqual(new_domains
, [])
479 def test_propagate_missing(self
):
482 f1
.add_driver(s1
, "sync")
484 with self
.assertRaises(DomainError
,
485 msg
="Domain 'sync' is used but not defined"):
486 f1
._propagate_domains(missing_domain
=lambda name
: None)
488 def test_propagate_create_missing(self
):
491 f1
.add_driver(s1
, "sync")
493 f1
.add_subfragment(f2
)
495 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: ClockDomain(name
))
496 self
.assertEqual(f1
.domains
.keys(), {"sync"})
497 self
.assertEqual(f2
.domains
.keys(), {"sync"})
498 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
499 self
.assertEqual(new_domains
, [f1
.domains
["sync"]])
501 def test_propagate_create_missing_fragment(self
):
504 f1
.add_driver(s1
, "sync")
506 cd
= ClockDomain("sync")
510 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: f2
)
511 self
.assertEqual(f1
.domains
.keys(), {"sync"})
512 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
513 self
.assertEqual(new_domains
, [])
514 self
.assertEqual(f1
.subfragments
, [
518 def test_propagate_create_missing_fragment_many_domains(self
):
521 f1
.add_driver(s1
, "sync")
523 cd_por
= ClockDomain("por")
524 cd_sync
= ClockDomain("sync")
526 f2
.add_domains(cd_por
, cd_sync
)
528 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: f2
)
529 self
.assertEqual(f1
.domains
.keys(), {"sync", "por"})
530 self
.assertEqual(f2
.domains
.keys(), {"sync", "por"})
531 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
532 self
.assertEqual(new_domains
, [])
533 self
.assertEqual(f1
.subfragments
, [
537 def test_propagate_create_missing_fragment_wrong(self
):
540 f1
.add_driver(s1
, "sync")
543 f2
.add_domains(ClockDomain("foo"))
545 with self
.assertRaises(DomainError
,
546 msg
="Fragment returned by missing domain callback does not define requested "
547 "domain 'sync' (defines 'foo')."):
548 f1
._propagate_domains(missing_domain
=lambda name
: f2
)
551 class FragmentHierarchyConflictTestCase(FHDLTestCase
):
552 def setUp_self_sub(self
):
558 self
.f1
.add_statements(self
.c1
.eq(0))
559 self
.f1
.add_driver(self
.s1
)
560 self
.f1
.add_driver(self
.c1
, "sync")
562 self
.f1a
= Fragment()
563 self
.f1
.add_subfragment(self
.f1a
, "f1a")
566 self
.f2
.add_statements(self
.c2
.eq(1))
567 self
.f2
.add_driver(self
.s1
)
568 self
.f2
.add_driver(self
.c2
, "sync")
569 self
.f1
.add_subfragment(self
.f2
)
571 self
.f1b
= Fragment()
572 self
.f1
.add_subfragment(self
.f1b
, "f1b")
574 self
.f2a
= Fragment()
575 self
.f2
.add_subfragment(self
.f2a
, "f2a")
577 def test_conflict_self_sub(self
):
578 self
.setUp_self_sub()
580 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
581 self
.assertEqual(self
.f1
.subfragments
, [
586 self
.assertRepr(self
.f1
.statements
, """
588 (eq (sig c1) (const 1'd0))
589 (eq (sig c2) (const 1'd1))
592 self
.assertEqual(self
.f1
.drivers
, {
593 None: SignalSet((self
.s1
,)),
594 "sync": SignalSet((self
.c1
, self
.c2
)),
597 def test_conflict_self_sub_error(self
):
598 self
.setUp_self_sub()
600 with self
.assertRaises(DriverConflict
,
601 msg
="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>"):
602 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
604 def test_conflict_self_sub_warning(self
):
605 self
.setUp_self_sub()
607 with self
.assertWarns(DriverConflict
,
608 msg
="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>; "
609 "hierarchy will be flattened"):
610 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
612 def setUp_sub_sub(self
):
620 self
.f2
.add_driver(self
.s1
)
621 self
.f2
.add_statements(self
.c1
.eq(0))
622 self
.f1
.add_subfragment(self
.f2
)
625 self
.f3
.add_driver(self
.s1
)
626 self
.f3
.add_statements(self
.c2
.eq(1))
627 self
.f1
.add_subfragment(self
.f3
)
629 def test_conflict_sub_sub(self
):
632 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
633 self
.assertEqual(self
.f1
.subfragments
, [])
634 self
.assertRepr(self
.f1
.statements
, """
636 (eq (sig c1) (const 1'd0))
637 (eq (sig c2) (const 1'd1))
641 def setUp_self_subsub(self
):
647 self
.f1
.add_driver(self
.s1
)
650 self
.f2
.add_statements(self
.c1
.eq(0))
651 self
.f1
.add_subfragment(self
.f2
)
654 self
.f3
.add_driver(self
.s1
)
655 self
.f3
.add_statements(self
.c2
.eq(1))
656 self
.f2
.add_subfragment(self
.f3
)
658 def test_conflict_self_subsub(self
):
659 self
.setUp_self_subsub()
661 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
662 self
.assertEqual(self
.f1
.subfragments
, [])
663 self
.assertRepr(self
.f1
.statements
, """
665 (eq (sig c1) (const 1'd0))
666 (eq (sig c2) (const 1'd1))
670 def setUp_memory(self
):
671 self
.m
= Memory(width
=8, depth
=4)
672 self
.fr
= self
.m
.read_port().elaborate(platform
=None)
673 self
.fw
= self
.m
.write_port().elaborate(platform
=None)
676 self
.f2
.add_subfragment(self
.fr
)
677 self
.f1
.add_subfragment(self
.f2
)
679 self
.f3
.add_subfragment(self
.fw
)
680 self
.f1
.add_subfragment(self
.f3
)
682 def test_conflict_memory(self
):
685 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
686 self
.assertEqual(self
.f1
.subfragments
, [
691 def test_conflict_memory_error(self
):
694 with self
.assertRaises(DriverConflict
,
695 msg
="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
697 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
699 def test_conflict_memory_warning(self
):
702 with self
.assertWarns(DriverConflict
,
703 msg
="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
704 "top.<unnamed #1>; hierarchy will be flattened"):
705 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
707 def test_explicit_flatten(self
):
710 self
.f2
.flatten
= True
711 self
.f1
.add_subfragment(self
.f2
)
713 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
714 self
.assertEqual(self
.f1
.subfragments
, [])
716 def test_no_conflict_local_domains(self
):
718 cd1
= ClockDomain("d", local
=True)
720 f1
.add_driver(ClockSignal("d"))
722 cd2
= ClockDomain("d", local
=True)
724 f2
.add_driver(ClockSignal("d"))
726 f3
.add_subfragment(f1
)
727 f3
.add_subfragment(f2
)
731 class InstanceTestCase(FHDLTestCase
):
732 def test_construct(self
):
739 inst
= Instance("foo",
741 ("p", "PARAM1", 0x1234),
751 self
.assertEqual(inst
.attrs
, OrderedDict([
755 self
.assertEqual(inst
.parameters
, OrderedDict([
759 self
.assertEqual(inst
.named_ports
, OrderedDict([
768 def test_cast_ports(self
):
769 inst
= Instance("foo",
777 self
.assertRepr(inst
.named_ports
["s1"][0], "(const 1'd1)")
778 self
.assertRepr(inst
.named_ports
["s2"][0], "(const 2'd2)")
779 self
.assertRepr(inst
.named_ports
["s3"][0], "(const 2'd3)")
780 self
.assertRepr(inst
.named_ports
["s4"][0], "(const 3'd4)")
781 self
.assertRepr(inst
.named_ports
["s5"][0], "(const 3'd5)")
782 self
.assertRepr(inst
.named_ports
["s6"][0], "(const 3'd6)")
784 def test_wrong_construct_arg(self
):
786 with self
.assertRaises(NameError,
787 msg
="Instance argument ('', 's1', (sig s)) should be a tuple "
788 "(kind, name, value) where kind is one of \"p\", \"i\", \"o\", or \"io\""):
789 Instance("foo", ("", "s1", s
))
791 def test_wrong_construct_kwarg(self
):
793 with self
.assertRaises(NameError,
794 msg
="Instance keyword argument x_s1=(sig s) does not start with one of "
795 "\"p_\", \"i_\", \"o_\", or \"io_\""):
796 Instance("foo", x_s1
=s
)
801 self
.pins
= Signal(8)
802 self
.datal
= Signal(4)
803 self
.datah
= Signal(4)
804 self
.inst
= Instance("cpu",
809 o_data
=Cat(self
.datal
, self
.datah
),
812 self
.wrap
= Fragment()
813 self
.wrap
.add_subfragment(self
.inst
)
818 self
.assertEqual(f
.type, "cpu")
819 self
.assertEqual(f
.parameters
, OrderedDict([("RESET", 0x1234)]))
820 self
.assertEqual(list(f
.named_ports
.keys()), ["clk", "rst", "stb", "data", "pins"])
821 self
.assertEqual(f
.ports
, SignalDict([]))
823 def test_prepare(self
):
825 f
= self
.wrap
.prepare()
826 sync_clk
= f
.domains
["sync"].clk
827 self
.assertEqual(f
.ports
, SignalDict([
833 def test_prepare_explicit_ports(self
):
835 f
= self
.wrap
.prepare(ports
=[self
.rst
, self
.stb
])
836 sync_clk
= f
.domains
["sync"].clk
837 sync_rst
= f
.domains
["sync"].rst
838 self
.assertEqual(f
.ports
, SignalDict([
846 def test_prepare_slice_in_port(self
):
849 f
.add_subfragment(Instance("foo", o_O
=s
[0]))
850 f
.add_subfragment(Instance("foo", o_O
=s
[1]))
851 fp
= f
.prepare(ports
=[s
], missing_domain
=lambda name
: None)
852 self
.assertEqual(fp
.ports
, SignalDict([
856 def test_prepare_attrs(self
):
858 self
.inst
.attrs
["ATTR"] = 1
859 f
= self
.inst
.prepare()
860 self
.assertEqual(f
.attrs
, OrderedDict([