import dataclasses
from dataclasses import dataclass, field, InitVar
from typing import Dict, Tuple, List, Optional, Union
from pydispatch.properties import ObservableList
from tslumd import Screen, Tally, TallyType, TallyKey, TallyColor
from .config import Option, ListOption
__all__ = (
'Pixel', 'Rgb', 'TallyConfig', 'SingleTallyConfig', 'MultiTallyConfig',
)
Pixel = Tuple[int, int] #: A tuple of ``(x, y)`` coordinates
Rgb = Tuple[int, int, int] #: A color tuple of ``(r, g, b)``
TallyOrTallyConfig = Union[Tally, 'SingleTallyConfig']
TallyOrMultiTallyConfig = Union[TallyOrTallyConfig, 'MultiTallyConfig']
TallyColorOption = Option(
name='color_mask', type=str, required=False, title='Color',
serialize_cb=lambda x: x.to_str(),
validate_cb=lambda x: TallyColor.from_str(x),
)
def normalize_screen(obj: Union[TallyKey, TallyOrMultiTallyConfig, int]) -> Union[None, int]:
if obj is None:
return None
elif isinstance(obj, tuple):
obj = obj[0]
if isinstance(obj, int):
screen = obj
if obj == 0xffff:
screen = None
elif isinstance(obj, Tally):
screen = obj.screen.index
if obj.screen.is_broadcast:
screen = None
elif isinstance(obj, Screen):
screen = obj.index
if obj.is_broadcast:
screen = None
else:
screen = obj.screen_index
if obj.is_broadcast_screen:
screen = None
return screen
def normalize_tally_index(obj: Union[TallyKey, TallyOrTallyConfig, int]) -> Union[None, int]:
if isinstance(obj, tuple):
obj = obj[1]
if isinstance(obj, int):
ix = obj
if obj == 0xffff:
ix = None
elif isinstance(obj, Tally):
ix = obj.index
if obj.is_broadcast:
ix = None
else:
ix = obj.tally_index
if obj.is_broadcast_tally:
ix = None
return ix
def get_tally_key(obj: Union[TallyKey, TallyOrTallyConfig]) -> TallyKey:
if isinstance(obj, tuple):
return obj
return obj.id
[docs]@dataclass
class TallyConfig:
"""Configuration data for tally assignment
"""
[docs] def to_dict(self) -> Dict:
"""Serialize the config data
"""
return dataclasses.asdict(self)
[docs] @classmethod
def from_dict(cls, d: Dict) -> 'TallyConfig':
"""Create a :class:`TallyConfig` from serialized data
"""
return cls(**d)
[docs]@dataclass
class SingleTallyConfig(TallyConfig):
"""Configuration for a single tally
"""
tally_index: int
"""The tally index ranging from 0 to 65534 (``0xfffe``)
The special value of 65535 (``0xffff``) is used as a "broadcast" address
"""
tally_type: TallyType = TallyType.no_tally
"""The :class:`~tslumd.common.TallyType`
"""
color_mask: TallyColor = TallyColor.AMBER
"""An optional mask which can limit the color changes
The default (:attr:`~.tslumd.common.TallyColor.AMBER`) allows all changes
"""
screen_index: Optional[int] = None
"""The :attr:`~tslumd.tallyobj.Screen.index` of the
:class:`tslumd.tallyobj.Screen` the tally belongs to,
ranging from 0 to 65534 (``0xfffe``)
If not provided (or ``None``), the tally is assumed to belong to *any* screen.
This is also the case if the value is 65535 (``0xffff``), defined as the
"broadcast" screen address.
"""
name: Optional[str] = ''
"""User-defined name for the tally
"""
[docs] @classmethod
def from_tally(cls, tally: Tally, **kwargs) -> 'SingleTallyConfig':
"""Create a :class:`SingleTallyConfig` from a :class:`~tslumd.tallyobj.Tally`
"""
kwargs.setdefault('tally_type', TallyType.all_tally)
scr, tly = tally.id
scr = normalize_screen(tally)
tly = normalize_tally_index(tally)
return cls(screen_index=scr, tally_index=tly, **kwargs)
@property
def tally_key(self) -> TallyKey:
"""A tuple of (:attr:`screen_index`, :attr:`tally_index`) matching the
format used for :attr:`tslumd.tallyobj.Tally.id`
If :attr:`screen_index` or :attr:`tally_index` is ``None``, they are set
to 65535 (``0xffff``)
"""
scr, tly = self.screen_index, self.tally_index
if scr is None:
scr = 0xffff
if tly is None:
tly = 0xffff
return (scr, tly)
@property
def id(self) -> TallyKey:
"""Alias for :attr:`tally_key` to match
:attr:`Tally.id <tslumd.tallyobj.Tally.id>`
"""
return self.tally_key
@property
def is_broadcast_screen(self) -> bool:
"""``True`` if :attr:`screen_index` is set to ``None`` or the "broadcast"
address of 65535 (``0xffff``)
"""
return self.screen_index in (None, 0xffff)
@property
def is_broadcast_tally(self) -> bool:
"""``True`` if :attr:`tally_index` is set to ``None`` or the "broadcast"
address of 65535 (``0xffff``)
"""
return self.tally_index in (None, 0xffff)
@classmethod
def get_init_options(cls) -> Tuple[Option]:
tt_choices = tuple((tt.name for tt in TallyType))
return (
Option(name='tally_index', type=int, required=True, title='Index'),
Option(
name='tally_type', type=str, required=True, choices=tt_choices,
serialize_cb=lambda x: x.to_str(),
validate_cb=lambda x: TallyType.from_str(x),
title='TallyType',
),
TallyColorOption,
Option(name='screen_index', type=int, required=False, title='Screen'),
Option(name='name', type=str, required=False, default='', title='Name'),
)
[docs] def matches(
self,
other: Union[TallyOrTallyConfig, TallyKey],
tally_type: Optional[TallyType] = TallyType.all_tally,
return_matched: Optional[bool] = False
) -> Union[bool, 'SingleTallyConfig']:
"""Determine whether the given tally argument matches this one
For :attr:`screen_index` the :meth:`matches_screen` method is used
Arguments:
other: Either another :class:`SingleTallyConfig`, a
:class:`tslumd.tallyobj.Tally` instance or a :term:`TallyKey`
tally_type: If provided, a :class:`~tslumd.common.TallyType` member
(or members) to match against
return_matched: If False (the default), only return a boolean result,
otherwise return the matched :class:`SingleTallyConfig` if one
was found.
"""
if not self.matches_screen(other):
return False
if isinstance(other, SingleTallyConfig):
if self.tally_type & other.tally_type == TallyType.no_tally:
return False
if self.tally_type & tally_type == TallyType.no_tally:
return False
self_ix = normalize_tally_index(self)
oth_ix = normalize_tally_index(other)
if None in (self_ix, oth_ix):
r = True
else:
r = self_ix == oth_ix
if r and return_matched:
return self
return r
[docs] def matches_screen(self, other: Union[TallyOrMultiTallyConfig, TallyKey, int]) -> bool:
"""Determine whether the :attr:`screen_index` matches the given argument
For :class:`tslumd.tallyobj.Tally`, the
:attr:`screen's <tslumd.tallyobj.Tally.screen>`
:attr:`~tslumd.tallyobj.Screen.is_broadcast` value is taken into account
as well as cases where :attr:`screen_index` is set to ``None``
Arguments:
other: A :class:`SingleTallyConfig`, :class:`MultiTallyConfig`,
:class:`tslumd.tallyobj.Tally` or :class:`int`
"""
if self.is_broadcast_screen:
return True
self_screen = normalize_screen(self)
oth_screen = normalize_screen(other)
if None not in (self_screen, oth_screen):
return self_screen == oth_screen
return True
[docs] def to_dict(self) -> Dict:
d = super().to_dict()
d['tally_type'] = d['tally_type'].to_str()
return d
[docs] @classmethod
def from_dict(cls, d: Dict) -> 'SingleTallyConfig':
kw = d.copy()
if not isinstance(kw['tally_type'], TallyType):
kw['tally_type'] = TallyType.from_str(kw['tally_type'])
return super().from_dict(kw)
[docs] def create_screen(self) -> Screen:
"""Create a :class:`tslumd.tallyobj.Screen` with the :attr:`screen_index`
of this object
"""
if self.screen_index is None:
return Screen.broadcast()
return Screen(self.screen_index)
[docs] def create_tally(self, screen: Optional[Screen] = None) -> Tuple[Screen, Tally]:
"""Create a :class:`tslumd.tallyobj.Tally` from this instance
Arguments:
screen (tslumd.tallyobj.Screen, optional): The parent
:attr:`tslumd.tallyobj.Tally.screen` to add the tally to.
If not provided, one will be created
Returns
-------
screen : tslumd.tallyobj.Screen
The parent screen that was either created or given as an argument
tally : tslumd.tallyobj.Tally
The tally object
"""
if screen is None:
screen = self.create_screen()
if self.tally_index is None:
tally = screen.broadcast_tally()
else:
tally = screen.add_tally(self.tally_index)
return screen, tally
[docs]@dataclass
class MultiTallyConfig(TallyConfig):
"""Configuration for multiple tallies
"""
tallies: InitVar[List[SingleTallyConfig]] = None
"""A list of :class:`SingleTallyConfig` instances
"""
screen_index: Optional[int] = None
"""The :attr:`~tslumd.tallyobj.Screen.index` of the
:class:`tslumd.tallyobj.Screen` for the configuration
ranging from 0 to 65534 (``0xfffe``)
This only takes effect if :attr:`allow_all` is ``True`` and provides a method
of filtering the tally assignments to a single :class:`tslumd.tallyobj.Screen`
if desired.
If not provided (or ``None``), all tallies within all screens are considered
to be members of the configuration. This is also the case if the value
is 65535 (``0xffff``), defined as the "broadcast" screen address.
"""
allow_all: bool = False
"""If ``True``, all possible tally configurations exist within this instance
Tallies can still be limited to a specific :attr:`screen_index` if desired
"""
name: Optional[str] = ''
"""User-defined name for the tally config
"""
def __post_init__(self, tallies):
if tallies is None:
tallies = []
self._tallies_by_key = None
self.copy_on_change = False
self.tallies = ObservableList(tallies, obj=self, property=self)
def _on_change(self, obj, old, value, **kwargs):
"""This is a callback from :class:`pydispatch.properties.ObservableList`
"""
self._memoized_tally_confs = None
@property
def memoized_tally_confs(self) -> Dict[TallyKey, Dict[TallyType, SingleTallyConfig]]:
r = getattr(self, '_memoized_tally_confs', None)
if r is None:
r = self._memoized_tally_confs = {}
return r
@property
def is_broadcast_screen(self) -> bool:
"""``True`` if :attr:`screen_index` is set to ``None`` or the "broadcast"
address of 65535 (``0xffff``)
Note:
Behavior is undefined if :attr:`allow_all` is ``False``
"""
if not self.allow_all:
return True
return self.screen_index in (None, 0xffff)
@classmethod
def get_init_options(cls) -> Tuple[Option]:
return (
ListOption(
name='tallies', type=SingleTallyConfig, required=False,
sub_options=SingleTallyConfig.get_init_options(),
title='Tallies',
),
Option(name='screen_index', type=int, required=False, title='Screen'),
Option(
name='allow_all', type=bool, required=False, default=False,
title='Allow All',
),
Option(name='name', type=str, required=False, default='', title='Name'),
)
[docs] def matches(
self,
tally: Union[TallyOrTallyConfig, TallyKey],
tally_type: Optional[TallyType] = TallyType.all_tally,
return_matched: Optional[bool] = False
) -> Union[bool, SingleTallyConfig]:
"""Alias for :meth:`contains`
"""
return self.contains(tally, tally_type, return_matched)
[docs] def matches_screen(self, other: Union[TallyOrMultiTallyConfig, TallyKey, int]) -> bool:
"""Determine whether the :attr:`screen_index` matches the given argument
For :class:`tslumd.tallyobj.Tally`, the
:attr:`screen's <tslumd.tallyobj.Tally.screen>`
:attr:`~tslumd.tallyobj.Screen.is_broadcast` value is taken into account
as well as cases where :attr:`screen_index` is set to ``None``
Arguments:
other: A :class:`SingleTallyConfig`, :class:`MultiTallyConfig`,
:class:`tslumd.tallyobj.Tally` or :class:`int`
Note:
Behavior is undefined if :attr:`allow_all` is ``False``
"""
if self.screen_index is None:
return True
if isinstance(other, SingleTallyConfig):
return other.matches_screen(self)
self_screen = normalize_screen(self)
oth_screen = normalize_screen(other)
if None not in (self_screen, oth_screen):
return self_screen == oth_screen
return True
[docs] def contains(
self,
tally: Union[TallyOrTallyConfig, TallyKey],
tally_type: Optional[TallyType] = TallyType.all_tally,
return_matched: Optional[bool] = False
) -> Union[bool, SingleTallyConfig]:
"""Determine whether the given tally argument is included in this
configuration
The :meth:`matches_screen` method is used to match the :attr:`screen_index`.
If :attr:`allow_all` is ``False``, each object in :attr:`tallies` is
:meth:`checked <SingleTallyConfig.matches>`
Arguments:
tally: Either a :class:`SingleTallyConfig` or a
:class:`tslumd.tallyobj.Tally` instance
"""
memoized = self.search_memoized(tally, tally_type)
if memoized is not None:
if return_matched:
return memoized
return True
if self.allow_all:
if not self.matches_screen(tally):
return False
if return_matched:
ret = self._create_single_conf(tally, tally_type)
self.add_memoized(ret)
return ret
return True
t = self._search_tallies(tally, tally_type)
if t is not None:
self.add_memoized(t)
if return_matched:
return t
return True
return False
def _search_tallies(
self,
tally: Union[TallyOrTallyConfig, TallyKey],
tally_type: Optional[TallyType] = TallyType.all_tally
) -> Optional[SingleTallyConfig]:
for t in self.tallies:
if t.matches(tally, tally_type):
return t
def search_memoized(
self,
obj: Union[TallyOrTallyConfig],
tally_type: Optional[TallyType] = TallyType.all_tally
) -> Optional[SingleTallyConfig]:
memo = self.memoized_tally_confs
t_id = get_tally_key(obj)
if t_id not in memo:
return None
if isinstance(obj, SingleTallyConfig):
ttype = obj.tally_type
else:
ttype = tally_type
return memo[t_id].get(ttype)
def add_memoized(self, obj: SingleTallyConfig):
memo = self.memoized_tally_confs
if obj.id not in memo:
memo[obj.id] = {}
# assert obj.tally_type not in memo[obj.id], 'obj exists'
if obj.tally_type in memo[obj.id]:
oth = memo[obj.id][obj.tally_type]
assert obj == oth
memo[obj.id][obj.tally_type] = obj
[docs] def to_dict(self) -> Dict:
tallies = [c.to_dict() for c in self.tallies]
return {'tallies':tallies, 'allow_all':self.allow_all}
[docs] @classmethod
def from_dict(cls, d: Dict) -> 'MultiTallyConfig':
kw = d.copy()
tallies = kw['tallies']
kw['tallies'] = [SingleTallyConfig.from_dict(td) for td in tallies]
return super().from_dict(kw)
def _create_single_conf(
self,
tally: Union[TallyOrTallyConfig, TallyKey],
tally_type: Optional[TallyType] = TallyType.all_tally
) -> SingleTallyConfig:
if isinstance(tally, SingleTallyConfig):
ret = tally
elif not isinstance(tally, Tally):
scr, tly = tally
ret = SingleTallyConfig(
screen_index=scr,
tally_index=tly,
tally_type=tally_type,
)
else:
ret = SingleTallyConfig.from_tally(tally, tally_type=tally_type)
return ret
SingleTallyOption = Option(
name='config', type=SingleTallyConfig, required=True,
sub_options=SingleTallyConfig.get_init_options(),
title='Tally Config',
)
MultiTallyOption = Option(
name='config', type=MultiTallyConfig, required=True,
sub_options=MultiTallyConfig.get_init_options(),
title='Multi Tally Config',
)