Source code for tallypi.outputs.rgbmatrix5x5

"""An RGB LED display of 5x5 pixels made by `pimoroni`_

Currently uses the `library`_ maintained by the manufacturer for device
communication


.. _pimoroni: https://shop.pimoroni.com/products/5x5-rgb-matrix-breakout
.. _library: https://github.com/pimoroni/rgbmatrix5x5-python

"""
from loguru import logger
import asyncio
from typing import Dict, List, Tuple, Set, Iterable, Optional, Any, ClassVar, Union
import rgbmatrix5x5
from tslumd import TallyType, TallyColor, Tally, TallyKey

from tallypi.common import (
    SingleTallyOption, SingleTallyConfig, MultiTallyConfig, Pixel, Rgb,
)
from tallypi.baseio import BaseOutput
from tallypi.config import Option

__all__ = ('Indicator', 'Matrix')

TallyTypeKey = Tuple[int, int, TallyType]

[docs]class Base(BaseOutput, namespace='rgbmatrix5x5'): """Base class for RGBMatrix5x5 displays Arguments: config(SingleTallyConfig): The initial value for :attr:`~tallypi.baseio.BaseIO.config` brightness_scale(float, optional): The value to set for :attr:`brightness_scale`. Default is 1.0 """ color_map: ClassVar[Dict[TallyColor, Rgb]] = { TallyColor.OFF: (0, 0, 0), TallyColor.RED: (255, 0, 0), TallyColor.GREEN: (0, 255, 0), TallyColor.AMBER: (0xff, 0xbf, 0), } """Mapping of :class:`tslumd.common.TallyColor` to tuples of :data:`~tallypi.common.Rgb` """ brightness_scale: float """A multiplier (from 0.0 to 1.0) used to limit the maximum brightness. A value of 1.0 produces the full range while 0.5 scales to half brightness. """ device: 'rgbmatrix5x5.RGBMatrix5x5' """The :class:`rgbmatrix5x5.RGBMatrix5x5` instance """ def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): self.device = None self.brightness_scale = brightness_scale super().__init__(config)
[docs] @classmethod def get_init_options(cls) -> Tuple[Option]: return ( SingleTallyOption, Option( name='brightness_scale', type=float, required=False, default=1.0, title='Brightness Scale', ), )
[docs] async def open(self): """Create the :attr:`device` instance and initialize """ if self.running: return if self.device is None: self.device = rgbmatrix5x5.RGBMatrix5x5() # self.device.set_clear_on_exit() # self.device.set_brightness(0.8) self.running = True
[docs] async def close(self): """Close the :attr:`device` """ if not self.running: return self.running = False if self.device is not None: self.device.clear() self.device.show() self.device = None
[docs]class Indicator(Base, namespace='Indicator', final=True): """Show a solid color for a single :class:`~tslumd.tallyobj.Tally` """ def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): self._color = None self._brightness = None super().__init__(config, brightness_scale)
[docs] async def set_color(self, color: TallyColor): """Set all pixels of the :attr:`device` to the given color The rgb values are retrieved from the :attr:`~.Base.color_map` Arguments: color: The :class:`~tslumd.common.TallyColor` """ if not self.running: return rgb = self.color_map[color] self.device.set_all(*rgb) if self._brightness is None: await self.set_brightness(1.0) else: self.device.show() self._color = color
[docs] async def set_brightness(self, brightness: float): """Set the brightness of the device Arguments: brightness: The brightness value from ``0.0`` to ``1.0`` """ if not self.running: return self.device.set_brightness(brightness * self.brightness_scale) self.device.show() self._brightness = brightness
[docs] async def on_receiver_tally_change(self, tally: Tally, props_changed: Set[str], **kwargs): if not self.running: return if not self.tally_matches(tally): return color = self.get_merged_tally(tally, self.config.tally_type) if color != self._color: await self.set_color(color) brightness = tally.normalized_brightness if brightness != self._brightness: await self.set_brightness(brightness)
[docs]class Matrix(Base, namespace='Matrix', final=True): """Show the status of up to 5 tallies in a matrix The tallies are shown in 5 rows beginning with the :attr:`~.common.SingleTallyConfig.tally_index` of the :attr:`~.baseio.BaseIO.config` The columns show the individual :class:`~tslumd.common.TallyType` values ``('rh_tally', 'txt_tally', 'lh_tally')`` """ colors: Dict[Pixel, TallyColor] update_queue: asyncio.Queue multi_config: MultiTallyConfig tally_type_map: Dict[TallyTypeKey, Pixel] def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): super().__init__(config, brightness_scale) self.colors = {(x,y):TallyColor.OFF for y in range(5) for x in range(5)} self.tally_type_map = {} self.build_multi_config() self.update_queue = asyncio.Queue() self._update_task = None def build_multi_config(self) -> MultiTallyConfig: self.tally_type_map.clear() tconfs = [] scr = self.config.screen_index start_index = self.config.tally_index for i in range(5): tally_index = start_index + i for j, ttype in enumerate(TallyType.all()): pixel = (j, i) tconf = SingleTallyConfig( screen_index=scr, tally_index=tally_index, tally_type=ttype ) tconfs.append(tconf) key = tconf.tally_key + (ttype,) self.tally_type_map[key] = pixel self.multi_config = MultiTallyConfig(tallies=tconfs)
[docs] async def open(self): if self.running: return await self.queue_update(*self.keys()) await super().open() self.device.set_brightness(self.brightness_scale) self._update_task = asyncio.create_task(self.update_loop())
[docs] async def close(self): if not self.running: return await super().close() t = self._update_task self._update_task = None self.clear_queue() if t is not None: await self.update_queue.put(None) await t self.clear_queue()
[docs] def tally_matches( self, tally: Tally, tally_type: Optional[TallyType] = TallyType.all_tally, return_matched: Optional[bool] = False ) -> Union[bool, SingleTallyConfig]: return self.multi_config.matches(tally, tally_type, return_matched)
[docs] @logger.catch async def on_receiver_tally_change(self, tally: Tally, props_changed: Set[str], **kwargs): changed = set() for prop in props_changed: if prop not in TallyType.__members__: continue ttype = TallyType.from_str(prop) pixel = self.tally_type_map.get(tally.id + (ttype,)) color = self.get_merged_tally(tally, ttype) if color == self.get(pixel): continue self[pixel] = color changed.add(pixel) await self.queue_update(*changed)
def clear_queue(self): while not self.update_queue.empty(): try: self.update_queue.task_done() except ValueError: break async def queue_update(self, *keys): coros = set() for key in keys: coros.add(self.update_queue.put(key)) if len(coros): await asyncio.gather(*coros) async def update_loop(self): def update_pixel(key: Pixel): y, x = key color = self[key] rgb = self.color_map[color] self.device.set_pixel(x, y, *rgb) while self.running: item = await self.update_queue.get() if item is None: self.update_queue.task_done() break if self.device is None: await asyncio.sleep(.1) continue # logger.debug(f'update_pixel: {item}') update_pixel(item) self.update_queue.task_done() if self.update_queue.empty(): # logger.debug('device.show()') self.device.show() def __getitem__(self, key: Pixel) -> TallyColor: return self.colors[key] def __setitem__(self, key: Pixel, color: TallyColor): self.colors[key] = color def get(self, key: Pixel, default: Any = None) -> Optional[TallyColor]: return self.colors.get(key, default) def keys(self) -> Iterable[Pixel]: yield from self def values(self) -> Iterable[TallyColor]: for key in self: yield self[key] def items(self) -> Iterable[Tuple[Pixel, TallyColor]]: for key in self: yield key, self[key] def __iter__(self) -> Iterable[Pixel]: yield from sorted(self.colors.keys())