Implement Flux Architecture with Python

What is Flux

Flux is a software architecture that Facebook uses for building client-side web applications. It complements React’s composable view components by utilizing a unidirectional data flow. It’s more of a pattern rather than a formal framework, and you can start using Flux immediately without a lot of new code.

Python Implementation

  1import sys
  2import threading
  3
  4from log_util import log_info
  5from typing import Any, Callable, Dict, Generic, List, Optional, TypedDict, Union, cast, TypeVar
  6
  7
  8_EventType = List[str]
  9_Payload = Dict
 10_TState = TypeVar('_TState')
 11_Reducer = Callable[[_TState, _EventType, _Payload], _TState]
 12_BoundReducer = Callable[[_TState, _Payload], _TState]
 13
 14_print_lock = threading.Lock()
 15
 16def log_error(msg: str):
 17    COLOR='\033[0;31m' # Red
 18    NC='\033[0m'
 19    with _print_lock:
 20        if sys.stdout.isatty():
 21            print(f'[ERROR]{COLOR}{msg}{NC}')
 22        else:
 23            print('ERROR]'+msg)
 24
 25def log_info(msg: str):
 26    COLOR='\033[0;32m' # Green
 27    NC='\033[0m'
 28    with _print_lock:
 29        if sys.stdout.isatty():
 30            print(f'[INFO]{COLOR}{msg}{NC}')
 31        else:
 32            print('[INFO]' + msg)
 33
 34
 35def _log_event(state, event_type, payload):
 36    log_info(f'[Automata] event_type: {event_type}, payload: {payload}')
 37    return state
 38
 39class Dispatcher(Generic[_TState]):
 40    def __init__(self, initial_state: _TState, debug=False, show_event_source=False):
 41        self._state: _TState = initial_state
 42        self._debug = debug
 43        self._show_event_source = show_event_source
 44        self._reducers = []
 45        if debug:
 46            self._reducers.append(_log_event)
 47        self._event_queue = []
 48        self._is_dispatching = False
 49
 50    def register(self, reducer: Union[_BoundReducer, _Reducer], event_type: Optional[_EventType] = None):
 51        if event_type is None:
 52            self._reducers.append(reducer)
 53            return self
 54        
 55        def reducer_wrapper(state, event_type_, payload):
 56            if event_type == event_type_:
 57                func = cast(_BoundReducer, reducer)
 58                return func(state, payload)
 59            return state
 60
 61        self._reducers.append(reducer_wrapper)
 62        return self
 63
 64    def _process(self):
 65        if len(self._event_queue) == 0 or self._is_dispatching:
 66            return
 67        self._is_dispatching = True
 68        while not len(self._event_queue) == 0:
 69            event_type, payload, event_source = self._event_queue.pop(0)
 70            if event_source is not None:
 71                log_info(f'[Automata] debug info: {event_source}')
 72            for reducer in self._reducers:
 73                self._state = reducer(self._state, event_type, payload)
 74        if self._debug and hasattr(self._state, 'print'):
 75            self._state.print() # type: ignore
 76        self._is_dispatching = False
 77
 78    def dispatch_later(self, event_type: _EventType, payload: _Payload):
 79        event_source = None
 80        if self._debug and self._show_event_source:
 81            import traceback
 82            event_source = traceback.format_stack()
 83        self._event_queue.append((event_type, payload, event_source))
 84        self._process()
 85    
 86    def dispatch(self, event_type: _EventType, payload: _Payload):
 87        for reducer in self._reducers:
 88            self._state = reducer(self._state, event_type, payload)
 89        if self._debug and hasattr(self._state, 'print'):
 90            self._state.print() # type: ignore
 91        if self._debug and self._show_event_source:
 92            import traceback
 93            log_info(f'[Automata] debug info: {traceback.format_stack()}')
 94        return self._state
 95
 96    def get_state(self) -> _TState:
 97        return self._state
 98
 99def event_handler(event_type):
100    def decorator(func):
101        def wrapper(self, *args, **kwargs):
102            return func(self, *args, **kwargs)
103        wrapper.__handler = [func, event_type]
104        return wrapper
105
106    return decorator
107
108def Stateful(store_class, debug=False, show_event_source=False):
109    def wrapper(cls):
110        cls.store = store_class()
111        cls._dispatcher = Dispatcher(cls.store, debug=debug, show_event_source=show_event_source)
112
113        original_init = cls.__init__
114        def new_init(self, *args, **kwargs):
115            original_init(self, *args, **kwargs)
116            for name, value in cls.__dict__.items():
117                if hasattr(value, '__handler'):
118                    handler, event_type = value.__handler
119                    # bind self to handler
120                    handler = handler.__get__(self, cls)
121                    cls._dispatcher.register(handler, event_type)
122                    del value.__handler
123        cls.__init__ = new_init
124        return cls
125
126    return wrapper

Example

  1
  2import json
  3import os
  4import random
  5import time
  6
  7class GameState:
  8    agents: Dict[str, str] # name -> position
  9    player_hp: int
 10    round: int
 11
 12    def __init__(self):
 13        self.agents = {}
 14        self.player_hp = 100
 15        self.round = 0
 16    
 17    def print(self):
 18        log_info(f'[State] present state: {json.dumps(self.__dict__, indent=2)}')
 19    
 20    def num_enemies_in_pos(self, pos: str) -> int:
 21        num = 0
 22        for name, pos in self.agents.items():
 23            if name == 'player':
 24                continue
 25            if pos == pos:
 26                num += 1
 27        return num
 28    
 29    def get_player_pos(self):
 30        return self.agents['player']
 31
 32    def num_enemies(self):
 33        return len(self.agents) - 1
 34
 35class EventType:
 36    ROUND_START = ['round', 'start']
 37    AGENT_ENTER = ['agent', 'enter']
 38    AGENT_SPAWN = ['agent', 'spawn']
 39    AGENT_EXIT = ['agent', 'exit']
 40    AGENT_DIE = ['agent', 'die']
 41    AGENT_HP_DECR = ['agent', 'hp', 'decr']
 42    AGENT_HP_INCR = ['agent', 'hp', 'incr']
 43
 44
 45@Stateful(GameState, debug=False)
 46class Game:
 47    _dispatcher: Dispatcher[GameState]
 48    @property
 49    def store(self) -> GameState:
 50        return self._dispatcher.get_state()
 51
 52    @event_handler(EventType.AGENT_SPAWN)
 53    def _on_agent_spawn(self, state: GameState, payload: Dict) -> GameState:
 54        name = payload['name']
 55        pos = payload['position']
 56        state.agents[name] = pos
 57        if name != 'player' and state.get_player_pos() == pos:
 58            print(f'player is attacked by {name} who just spawned')
 59            state = self._dispatcher.dispatch(EventType.AGENT_HP_DECR, {
 60                'value': 10
 61            })
 62        return state
 63
 64    @event_handler(EventType.AGENT_ENTER)
 65    def _on_agent_enter(self, state: GameState, payload: Dict) -> GameState:
 66        name = payload['name']
 67        pos = payload['position']
 68        print(f'{name} enter {pos}')
 69        state.agents[name] = pos
 70
 71        if name != 'player' and state.get_player_pos() == pos:
 72            print(f'player entered {pos} and is attacked by {name}')
 73            state = self._dispatcher.dispatch(EventType.AGENT_HP_DECR, {
 74                'value': 10
 75            })
 76        for name, pos in state.agents.items():
 77            if name == 'player':
 78                continue
 79            if pos == state.get_player_pos():
 80                print(f'player is attacked by {name} who is already in {pos}')
 81                state = self._dispatcher.dispatch(EventType.AGENT_HP_DECR, {
 82                    'value': 10
 83                })
 84
 85        return state
 86    
 87    @event_handler(EventType.AGENT_EXIT)
 88    def _on_agent_exit(self, state: GameState, payload: Dict) -> GameState:
 89        name = payload['name']
 90        state.agents.pop(name)
 91        return state
 92
 93    @event_handler(EventType.AGENT_HP_DECR)
 94    def _on_agent_hp_decr(self, state: GameState, payload: Dict) -> GameState:
 95        value = payload['value']
 96        state.player_hp -= value
 97        print(f'player hp: {state.player_hp}')
 98        if state.player_hp <= 0:
 99            state.player_hp = 0
100            self._dispatcher.dispatch(EventType.AGENT_DIE, {
101                'name': 'player'
102            })
103        return state
104    
105    @event_handler(EventType.AGENT_HP_INCR)
106    def _on_agent_hp_incr(self, state: GameState, payload: Dict) -> GameState:
107        value = payload['value']
108        state.player_hp += value
109        return state
110    
111    @event_handler(EventType.AGENT_DIE)
112    def _on_agent_die(self, state: GameState, payload: Dict) -> GameState:
113        name = payload['name']
114        state.agents.pop(name)
115        print(f'{name} is dead')
116        return state
117    
118    @event_handler(EventType.AGENT_DIE)
119    def on_player_die(self, state: GameState, payload: Dict) -> GameState:
120        print('you are dead')
121        print(f'you survived {state.round} rounds')
122        os._exit(0)
123
124    @event_handler(EventType.ROUND_START)
125    def _on_round_start(self, state: GameState, payload: Dict) -> GameState:
126        state.round += 1
127        print()
128        print(f'round {state.round} start')
129
130        print('num of enemies: ', state.num_enemies())
131        print('you are at position: ', state.get_player_pos())
132        valid_pos = [pos for pos in ['A', 'B', 'C'] if pos != state.get_player_pos()]
133        pos = input(f'choose a position to go: {valid_pos}')
134        while pos not in valid_pos:
135            print('invalid position')
136            pos = input(f'choose a position to go: {valid_pos}')
137            return state
138        self._dispatcher.dispatch(EventType.AGENT_EXIT, {
139            'name': 'player'
140        })
141
142        self._dispatcher.dispatch(EventType.AGENT_ENTER, {
143            'name': 'player',
144            'position': pos
145        })
146
147        # randomly spawn 0..3 enemy
148        num_enemies = random.randint(0, 3)
149        for i in range(num_enemies):
150            self._dispatcher.dispatch(EventType.AGENT_SPAWN, {
151                'name': f'enemy{i}',
152                'position': random.choice([pos for pos in 'ABC' if pos != state.get_player_pos()])
153            })
154        # randomly exit 0..N enemy
155        num_enemies = state.num_enemies()
156        num_exit = random.randint(0, num_enemies)
157        for i in range(num_exit):
158            name = random.choice(list(state.agents.keys()))
159            if name == 'player':
160                continue
161            self._dispatcher.dispatch(EventType.AGENT_EXIT, {
162                'name': name
163            })
164
165        return state
166
167    def main_loop(self):
168        def sleepy(state, event_type, payload) -> GameState:
169            time.sleep(0.3)
170            return state
171        self._dispatcher.register(sleepy)
172        self._dispatcher.dispatch(EventType.AGENT_SPAWN, {
173            'name': 'player',
174            'position': 'A'
175        })
176        while True:
177            self._dispatcher.dispatch(EventType.ROUND_START, {})
178
179game = Game()
180game.main_loop()