Source code for tallypi.outputs.rgbmatrix5x5

"""An RGB LED display of 5x5 pixels made by `pimoroni`_

Currently uses the `library`_ maintained by the manufacturer for device

.. _pimoroni:
.. _library:

from loguru import logger
import asyncio
from typing import Dict, List, Tuple, Set, Iterable, Optional, Any, ClassVar, Union
import rgbmatrix5x5
from tslumd import TallyType, TallyColor, Tally, TallyKey

from tallypi.common import (
    SingleTallyOption, SingleTallyConfig, MultiTallyConfig, Pixel, Rgb,
from tallypi.baseio import BaseOutput
from tallypi.config import Option

__all__ = ('Indicator', 'Matrix')

TallyTypeKey = Tuple[int, int, TallyType]

[docs]class Base(BaseOutput, namespace='rgbmatrix5x5'): """Base class for RGBMatrix5x5 displays Arguments: config(SingleTallyConfig): The initial value for :attr:`~tallypi.baseio.BaseIO.config` brightness_scale(float, optional): The value to set for :attr:`brightness_scale`. Default is 1.0 """ color_map: ClassVar[Dict[TallyColor, Rgb]] = { TallyColor.OFF: (0, 0, 0), TallyColor.RED: (255, 0, 0), TallyColor.GREEN: (0, 255, 0), TallyColor.AMBER: (0xff, 0xbf, 0), } """Mapping of :class:`tslumd.common.TallyColor` to tuples of :data:`~tallypi.common.Rgb` """ brightness_scale: float """A multiplier (from 0.0 to 1.0) used to limit the maximum brightness. A value of 1.0 produces the full range while 0.5 scales to half brightness. """ device: 'rgbmatrix5x5.RGBMatrix5x5' """The :class:`rgbmatrix5x5.RGBMatrix5x5` instance """ def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): self.device = None self.brightness_scale = brightness_scale super().__init__(config)
[docs] @classmethod def get_init_options(cls) -> Tuple[Option]: return ( SingleTallyOption, Option( name='brightness_scale', type=float, required=False, default=1.0, title='Brightness Scale', ), )
[docs] async def open(self): """Create the :attr:`device` instance and initialize """ if self.running: return if self.device is None: self.device = rgbmatrix5x5.RGBMatrix5x5() # self.device.set_clear_on_exit() # self.device.set_brightness(0.8) self.running = True
[docs] async def close(self): """Close the :attr:`device` """ if not self.running: return self.running = False if self.device is not None: self.device.clear() self.device = None
[docs]class Indicator(Base, namespace='Indicator', final=True): """Show a solid color for a single :class:`~tslumd.tallyobj.Tally` """ def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): self._color = None self._brightness = None super().__init__(config, brightness_scale)
[docs] async def set_color(self, color: TallyColor): """Set all pixels of the :attr:`device` to the given color The rgb values are retrieved from the :attr:`~.Base.color_map` Arguments: color: The :class:`~tslumd.common.TallyColor` """ if not self.running: return rgb = self.color_map[color] self.device.set_all(*rgb) if self._brightness is None: await self.set_brightness(1.0) else: self._color = color
[docs] async def set_brightness(self, brightness: float): """Set the brightness of the device Arguments: brightness: The brightness value from ``0.0`` to ``1.0`` """ if not self.running: return self.device.set_brightness(brightness * self.brightness_scale) self._brightness = brightness
[docs] async def on_receiver_tally_change(self, tally: Tally, props_changed: Set[str], **kwargs): if not self.running: return if not self.tally_matches(tally): return color = self.get_merged_tally(tally, self.config.tally_type) if color != self._color: await self.set_color(color) brightness = tally.normalized_brightness if brightness != self._brightness: await self.set_brightness(brightness)
[docs]class Matrix(Base, namespace='Matrix', final=True): """Show the status of up to 5 tallies in a matrix The tallies are shown in 5 rows beginning with the :attr:`~.common.SingleTallyConfig.tally_index` of the :attr:`~.baseio.BaseIO.config` The columns show the individual :class:`~tslumd.common.TallyType` values ``('rh_tally', 'txt_tally', 'lh_tally')`` """ colors: Dict[Pixel, TallyColor] update_queue: asyncio.Queue multi_config: MultiTallyConfig tally_type_map: Dict[TallyTypeKey, Pixel] def __init__(self, config: SingleTallyConfig, brightness_scale: Optional[float] = 1.0): super().__init__(config, brightness_scale) self.colors = {(x,y):TallyColor.OFF for y in range(5) for x in range(5)} self.tally_type_map = {} self.build_multi_config() self.update_queue = asyncio.Queue() self._update_task = None def build_multi_config(self) -> MultiTallyConfig: self.tally_type_map.clear() tconfs = [] scr = self.config.screen_index start_index = self.config.tally_index for i in range(5): tally_index = start_index + i for j, ttype in enumerate(TallyType.all()): pixel = (j, i) tconf = SingleTallyConfig( screen_index=scr, tally_index=tally_index, tally_type=ttype ) tconfs.append(tconf) key = tconf.tally_key + (ttype,) self.tally_type_map[key] = pixel self.multi_config = MultiTallyConfig(tallies=tconfs)
[docs] async def open(self): if self.running: return await self.queue_update(*self.keys()) await super().open() self.device.set_brightness(self.brightness_scale) self._update_task = asyncio.create_task(self.update_loop())
[docs] async def close(self): if not self.running: return await super().close() t = self._update_task self._update_task = None self.clear_queue() if t is not None: await self.update_queue.put(None) await t self.clear_queue()
[docs] def tally_matches( self, tally: Tally, tally_type: Optional[TallyType] = TallyType.all_tally, return_matched: Optional[bool] = False ) -> Union[bool, SingleTallyConfig]: return self.multi_config.matches(tally, tally_type, return_matched)
[docs] @logger.catch async def on_receiver_tally_change(self, tally: Tally, props_changed: Set[str], **kwargs): changed = set() for prop in props_changed: if prop not in TallyType.__members__: continue ttype = TallyType.from_str(prop) pixel = self.tally_type_map.get( + (ttype,)) color = self.get_merged_tally(tally, ttype) if color == self.get(pixel): continue self[pixel] = color changed.add(pixel) await self.queue_update(*changed)
def clear_queue(self): while not self.update_queue.empty(): try: self.update_queue.task_done() except ValueError: break async def queue_update(self, *keys): coros = set() for key in keys: coros.add(self.update_queue.put(key)) if len(coros): await asyncio.gather(*coros) async def update_loop(self): def update_pixel(key: Pixel): y, x = key color = self[key] rgb = self.color_map[color] self.device.set_pixel(x, y, *rgb) while self.running: item = await self.update_queue.get() if item is None: self.update_queue.task_done() break if self.device is None: await asyncio.sleep(.1) continue # logger.debug(f'update_pixel: {item}') update_pixel(item) self.update_queue.task_done() if self.update_queue.empty(): # logger.debug('') def __getitem__(self, key: Pixel) -> TallyColor: return self.colors[key] def __setitem__(self, key: Pixel, color: TallyColor): self.colors[key] = color def get(self, key: Pixel, default: Any = None) -> Optional[TallyColor]: return self.colors.get(key, default) def keys(self) -> Iterable[Pixel]: yield from self def values(self) -> Iterable[TallyColor]: for key in self: yield self[key] def items(self) -> Iterable[Tuple[Pixel, TallyColor]]: for key in self: yield key, self[key] def __iter__(self) -> Iterable[Pixel]: yield from sorted(self.colors.keys())