A Neopixel solution for Colorlight boards ...
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

320 lines
14 KiB

#!/usr/bin/env python3
#
# neopixelengine.py
#
# NeoPixel protocol engine (wanna-be), see:
# http://www.adafruit.com/datasheets/WS2812.pdf
# https://wp.josh.com/2014/05/13/ws2812-neopixels-are-not-so-finicky-once-you-get-to-know-them/
#
# History:
# --------
# 15.10.20/KQ Initial module export, test bench added
# 16.10.20/KQ Several submodules (incl. FSM) tested
# 17.10.20/KQ NeoPixel engine up & driving LEDs
# 18.10.20/KQ Cleanup & more 'array'
# 19.10.20/KQ Renamed from 'neopixelprotocol'
# 27.10.20/KQ Using AutoDoc etc(improved documentation generation)
# 29.10.20/KQ Memory object replaces array (due to resource consumption)
# 02.01.21/KQ DRAM loader integrated
#
from migen import *
from migen.fhdl.specials import Memory
from litex.soc.interconnect.csr import AutoCSR, CSRStatus, CSRStorage, CSRField, CSRAccess
from litex.soc.integration.doc import AutoDoc, ModuleDoc
from litex.soc.interconnect.csr import *
class NeoPixelEngine(Module, AutoCSR, AutoDoc, ModuleDoc):
"""
NeoPixelEngine class provides the protocol logic to drive NeoPixel LED strips
Usage:
######
1. Freeze operations by setting ``bEnable`` to false (0)
2. Fill NeoPixelEngine's local array of GRB values (Green/Red/Blue)
from DRAM location
Load ``b32DRAMAddress`` with a 32-bit DRAM memory pointer.
Indicate the offset (where to store) via writing to ``b6StoreOffset``.
Repeat for all DRAM address 'til end of array ...
3. Indicate no. of DRAM addresses stored by writing the # to ``b6NoOfTables``
4. Indicate to NeoPixelEngine the actual no. of pixels used by setting up ``b9Len``.
For now, all arrays have to have the same max. length.
5. Finally, enable processing by setting ``bEnable`` to true (1).
6. NPE processing will now run async. to CPU until reset or s.a. (1.).
CPU from now on will write to DRAM only (yet, L2 cache will have to be flushed),
all data will be picked up by FPGA automatically ...
Inputs:
#######
:b32DRAMAddress: New DRAM address to be loaded (32 bits) into local memory
:b6StoreOffset: Offset (0..63) into local memory to load b32DRAMAddress to
:b6NoOfTables: No. of actually used tables (0..63)
:b9Len: Length (0..511) of actual 32-bit data entries (i.e. # of NeoPixels)
:bEnable: To enable running (after data preparation)
Output:
#######
:bDataPin: NeoPixel 'Din' pin output (wire to actual output pin ... ;)
"""
def __init__(self, n_TABLES=1, n_LEDs=3, dramtransfer=None):
# On Colorlight-5A-75B/Lattice ECP5-25:
# 63 pins simultaneously driven (w/ 511 NeoPixels each!) yield 82% FPGA TRELLIS SLICE usage
# Inputs
self.b32DRAMAddress = CSRStorage(32, reset_less=True,
fields=[CSRField("Data2Load", size=32, description="*Field*: 32-Bit value")],
description="""
Load value (32-bit DRAM address).
Use ``b6StoreOffset`` first to indicate array location where to store new DRAM address value.
""")
self.b6StoreOffset = CSRStorage(6, reset_less=True,
fields=[CSRField("LoadOffset", size=6, description="*Field*: 6-Bit value (0..63)")],
description="""
Offset into local memory for 32-bit DRAM address values.
Prepare this one second, then indicate value to store via ``b32DRAMAddress``.
""")
self.b6NoOfTables = CSRStorage(6, reset_less=True,
fields=[CSRField("NoOfTables", size=6, description="*Field*: 6-Bit value (0..63)")],
description="""
No. of actually used different DRAM address blocks
""")
self.b9Len = CSRStorage(9, reset_less=True,
fields=[CSRField("Len", size=9, description="*Field*: 9-Bit value (0..511)")],
description="""
No. of active (GRB) entries.
Indicate actual # of elements used (may be less than max!)
""")
self.bEnable = CSRStorage(1, reset_less=True,
fields=[CSRField("Enable", size=1, description="*Field*: bit", values=[
("0", "DISABLED", "``NeoPixel`` protocol not active"),
("1", "ENABLED", "``NeoPixel`` protocol active"),
])
],
description="""
Enable free run (signal start & abort)
""")
# Local data
self.b6Table = Signal(6) # Table rover (0..63)
self.b9Offset = Signal(9) # Array rover (0..511)
self.b32GRB = Signal(32) # Current 24-bit(GRB) or 32-bit (GRBW) data to send
self.b12PulseLen = Signal(12) # Current pulse length
self.b5Count24 = Signal(5) # 24-Bit (TODO: or 32-bit) counter
self.b32BaseAddress = Signal(32) # Base DRAM address
storage = Memory(32, n_TABLES) # * n_LEDs)
self.specials += storage
wrport = storage.get_port(write_capable=True)
self.specials += wrport
self.comb += [ # Write to memory
wrport.adr.eq(self.b6StoreOffset.storage), # Index local memory for WRITE
wrport.dat_w.eq(self.b32DRAMAddress.storage), # & store DRAM address
wrport.we.eq(1)
]
rdport = storage.get_port()
self.specials += rdport
self.comb += [ # Read from DRAM addresses memory
rdport.adr.eq(self.b6Table) # Index DRAM address array (READ)
]
if dramtransfer != None: # This is for real!
self.sync += dramtransfer.b9Offset.storage.eq(self.b9Offset) # Index DRAM data value storage
# Output
self.bDataPin = Array(Signal(1) for bit in range(63)) # To be wired to data pins ...
###
fsm = FSM(reset_state="IDLETABLE") # FSM starts idling ...
self.submodules += fsm
fsm.act("IDLETABLE",
If((self.bEnable.storage==True) and (self.b9Len.storage > 0),
NextValue(self.b6Table, 0), # Start @ 1st (DRAM) table data
NextValue(self.b9Offset, 0), # Start @ 1st 24-bit or 32-bit data (mem will be ready next cycle)
NextValue(self.b5Count24, 0), # Bit count 0..23 (TODO: or 0..31)
NextState("TABLELOADLOOP")
)
)
fsm.act("TABLELOADLOOP", # 1st cycle delay for memory port access
NextState("TABLELOAD2")
)
fsm.act("TABLELOAD2", # DRAM address now selected (avail.), hence pull it!
NextValue(self.b32BaseAddress, rdport.dat_r), # Depends upon b6Table
NextState("TABLELOAD3")
)
fsm.act("TABLELOAD3", # Load from DRAM address
NextValue(dramtransfer.b32Address.storage, self.b32BaseAddress),
NextState("TABLELOAD4")
)
fsm.act("TABLELOAD4", # Engage!
NextValue(dramtransfer.bEnable.storage, 1),
NextState("TABLELOADWAIT")
)
fsm.act("TABLELOADWAIT", # Wait for termination of transfer ...
If(dramtransfer.bValid.storage, # Data avail.?
NextValue(dramtransfer.bEnable.storage, 0), # Reset/ACK to DRAM transfer (sort of ...)
NextState("GRBWORDLOOP") # Yap!
)
# TODO: Permit timeout indication ...
)
# G/R/B Word loop entry:
fsm.act("GRBWORDLOOP", # 1st cycle delay for memory port access
NextState("IDLE2")
)
fsm.act("IDLE2", # 2nd cycle delay ...
NextState("IDLE3")
)
fsm.act("IDLE3",
NextValue(self.b32GRB, dramtransfer.b32Data.storage), # Depends upon b9Offset
NextValue(self.b5Count24, 0), # Bit count 0..23 (TODO: or 0..31)
NextState("PREPAREBIT")
)
# 24-bit (TODO: or 32-bit) loop entry:
# Protocol: T0H=400ns/T0L=850ns, T1H=800ns/T1L=450ns, RST>50µs(>50000ns)
fsm.act("PREPAREBIT",
If(self.b32GRB[23],
NextValue(self.b12PulseLen, 47), # Compensate for 1 state changes w/o action ...),
NextState("T1H")
).Else(
NextValue(self.b12PulseLen, 23), # Compensate for 1 state changes w/o action ...
NextState("T0H")
)
)
fsm.act("T1H",
NextValue(self.bDataPin[self.b6Table], 1),
NextValue(self.b12PulseLen, self.b12PulseLen - 1),
If(self.b12PulseLen == 0,
If(self.b5Count24 < 23, # Not final pulse of word (TODO: Adjust for 32-bit as well!)
NextValue(self.b12PulseLen, 24) # Compensate for 3 state changes w/o action ...
).Else( # Final word pulse special
NextValue(self.b12PulseLen, 21) # Compensate word load cycles
),
NextState("T1L")
)
)
fsm.act("T1L",
NextValue(self.bDataPin[self.b6Table], 0),
NextValue(self.b12PulseLen, self.b12PulseLen - 1),
If(self.b12PulseLen == 0,
NextValue(self.b5Count24, self.b5Count24 + 1), # Next bit (of GRB)
NextValue(self.b32GRB, self.b32GRB << 1), # Next bit (of GRB)
NextState("NEXTBIT")
)
)
fsm.act("T0H",
NextValue(self.bDataPin[self.b6Table], 1),
NextValue(self.b12PulseLen, self.b12PulseLen - 1),
If(self.b12PulseLen == 0,
If(self.b5Count24 < 23, # Not final pulse of word? (TODO: Adjust for 32-bit as well!)
NextValue(self.b12PulseLen, 48) # Compensate for 3 state changes w/o action ...
).Else( # Final word load special
NextValue(self.b12PulseLen, 45) # Compensate for load word cycles
),
NextState("T0L")
)
)
fsm.act("T0L",
NextValue(self.bDataPin[self.b6Table], 0),
NextValue(self.b12PulseLen, self.b12PulseLen - 1),
If(self.b12PulseLen == 0,
NextValue(self.b5Count24, self.b5Count24 + 1), # Next bit (of GRB)
NextValue(self.b32GRB, self.b32GRB << 1), # Next bit (of GRB)
NextState("NEXTBIT")
)
)
fsm.act("NEXTBIT",
If(self.b5Count24 < 24, # Not yet done? (TODO: Adjust for 32-bit as well!)
NextState("PREPAREBIT")
).Else( # GRB word finished. More to come?
NextValue(self.b5Count24,0), # Bit count reset for next word
NextValue(self.b9Offset, self.b9Offset + 1), # Prepare offset for later use
NextState("NEXTWORD1")
)
)
fsm.act("NEXTWORD1",
NextState("NEXTWORD2") # Add one cycle for read port propagation!
)
fsm.act("NEXTWORD2",
NextState("NEXTWORD3") # Add one cycle for read port propagation!
)
fsm.act("NEXTWORD3",
If((self.b9Offset < self.b9Len.storage) & (self.bEnable.storage==True), # Still more words to come (& no exit request)?
NextValue(self.b32GRB, dramtransfer.b32Data.storage), # Depends upon b9Offset!
NextState("PREPAREBIT")
).Else(
NextValue(self.b12PulseLen, 4095), # >50µs required (3000 not ok!)
NextState("RST")
)
)
fsm.act("RST",
NextValue(self.bDataPin[self.b6Table], 0),
NextValue(self.b12PulseLen, self.b12PulseLen - 1),
If(self.b12PulseLen == 0,
NextValue(self.b6Table, self.b6Table + 1),
NextState("NEXTTABLE")
)
)
fsm.act("NEXTTABLE",
If(self.b6Table < self.b6NoOfTables.storage,
NextValue(self.b9Offset, 0), # Start @ 1st 24-bit data (TODO: or 32-bit data)
NextState("TABLELOADLOOP") # Bring in next DRAM table
).Else( # Final table: Restart!
NextState("IDLETABLE")
)
)
def npe_testbench(npe): # TODO: Make it work again ...
print("----- npe testbench -----")
yield npe.b4LoadTable.storage.eq(0)
yield
yield npe.b6LoadOffset.storage.eq(0)
yield
yield npe.b32DRAMAddress.storage.eq(0x110000)
yield
yield npe.b6LoadOffset.storage.eq(1)
yield
yield npe.b32DRAMAddress.storage.eq(0x002200)
yield
yield npe.b6LoadOffset.storage.eq(2)
yield
yield npe.b32DRAMAddress.storage.eq(0x000033)
yield
yield npe.b9Len.storage.eq(3)
yield
yield npe.bEnable.storage.eq(True)
yield
#
for i in range(10000): # Send the whole data & restart ...
print(i, ": ", sep="", end="")
print((yield npe.bDataPin[0])) # Actual pin to move
yield
if i == 5000:
yield npe.bEnable.storage.eq(True) # Enable quickest restart ...
yield
if __name__ == "__main__":
npe = NeoPixelEngine(n_TABLES=2, n_LEDs=3)
run_simulation(npe, npe_testbench(npe), vcd_name="npe.vcd", clocks={"sys":16})