""" Python implementation of the "Stigmergy Game" appearing in: Savit, Riolo, Riolo (2013) "Co-adaptation and the Emergence of Structure." PLoS ONE [http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0071828] The original implementation was done by Maria Riolo and this refactorization was written by Jon Atwell. Necessary Files: stigmergy_game.py (this file) stigmergy_game_runner.py Environment: Python 2.7.x - packages: math, random, sys, itertools Description of Model: This is a model of co-adaptation of behavior and enviroment. There is a single environment capable of being in E different states. N agents in the game take turns acting to change the state of the environment in accordance with their current strategy. The order of play is determined randomly. Agents have S strategies, which are state-contingent strategy tables; given the current state of the environment, the agent changes the environment to a predetermined state and receives a reward for that action. The strategies are constructed as follows: for each input state, an output state is randomly chosen, IID. Then a random vector of rewards of 1's, -1's, and 0's is generated. It is of length E (the number of environmental states) and has the constraint that it sums to zero. The reward in the ith position in this vector is the reward associated with the ith environment state. Each agent has S such strategies and after each turn, chooses the strategy which would have the highest total rewards had it been played the whole game. Description of File: This file does not contain a 'main' method (i.e., is not executable). It implements the classes used by the game and has a few utility functions. The instantiation of a game and the execution of it is handled by stigmergy_game_runner.py. That module also handles any I/O tasks, although this module does a lot of the data packaging """ import math import random import itertools as it # FUNCTIONS def randomScores(num_environs, setupRNG): """ A function to construct a list of rewards that sum to zero. It constructs a random list and then adds and removes individual integers until it sums to zero. The argument setupRNG is a random number generator. It is supplied as an argument so that it is possible to reproduce a set of strategies by supplying the same RNG. INPUT PARAMETERS num_environs: number of input enviromental states setupRNG: a random number generator OUTPUT my_list: a list of integers """ my_list = [setupRNG.randint(-1,1) for i in range(num_environs)] while sum(my_list) != 0: my_list.pop(0) # removing first element my_list.append(setupRNG.randint(-1,1)) # adding a new value # we shuffle the list (in place) before returning it. setupRNG.shuffle(my_list) return my_list def randomOutputs(num_environs, setupRNG): """ Returns an IID list of environment states of length 'num_environs'. This function is used to make lists of output states for the strategies. INPUT PARAMETERS num_environs: number of environmental states. setupRNG: a random num generator. OUTPUT list of environmental states of length num_environs. """ return [setupRNG.randrange(0, num_environs) for i in range(num_environs)] def GetEntropy(L): """ Returns the normalized entropy of a list of frequencies. INPUT PARAMETER L: the list of frequencies OUTPUT the entropy, as a float """ entropy = 0.0 max_entropy = -math.log(1.0/len(L)) total = sum(L) for p in L: if not p == 0: entropy -= p*math.log(p) return entropy / max_entropy def find_groups(switches, steps): """ Returns the final group assignment along with other top assignments. An assignment is a list of ones and zeros that identifies ingroup members with ones. The scores are found for all possible assignments. The top eleven scores for N>3, top 7 for N=3, and top 3 for N=2 scores are returned as a list of tuples with the assignment and the score. Note that lower scores are better.) INPUT PARAMETER: switches time steps each agent last switched, as integer steps the number of time steps in the game, as integer OUTPUT: List of tuples of top assignments format: (assignment vector, score) """ # Getting the number of agents in the run. N=len(switches) # we convert the raw timestep the agents stopped at into the fraction of # run for which the agent didn't switch strategies proportion = [(1 - i/float(steps)) for i in switches] # we know use the fraction of the run for which agents didn't switch to # calculate the distances (absolute value) between all agents along # this dimension. distances = [ [] for i in range(N) ] max_d = [0, None] for i in range(N): for j in range(N): dist = abs(proportion[i]-proportion[j]) distances[i].append(dist) # the list possible assignments, courtesy of the itertools package combos = list(it.product([0,1],repeat=N)) # Because half of the combinations are the same assignments--just # different a labeling of in and out--we need only the first half combos = combos[:len(combos)/2] # for recording scores / labelings in a kludgy way. lowest_eleven_scores = [1000 for i in range(11)] lowest_eleven_groups = [None for i in range(11)] for combo in combos: # cleaning up the iterator's format assignments = [a for a in combo] # we use the following two variables to track the ingroup and outgroup # distances for the assignment in_total = 0.0 out_total = 0.0 # first loop through finds distances for just one group for j in range(N): for k in range(N): if assignments[j]==1 and (k>j): a = assignments[k]*distances[j][k] in_total += a # now switching the group labels switch_group = [abs(l-1) for l in assignments] # and running the same code for j in range(N): for k in range(N): if switch_group[j] == 1 and (k>j): a = switch_group[k]*distances[j][k] out_total += a # the measure is the average with-in/with-out distance, so we need to # know the sizes of the two grous. n_in = sum(assignments) n_out = N-sum(assignments) if n_in <= 2: combo_in = 1. else: combo_in = (n_in*(n_in-1))/2. if n_out <= 2: combo_out = 1. else: combo_out = (n_out*(n_out-1))/2. # the calculate of the score score = (in_total / combo_in) + (out_total / combo_out) # now we see if this score is better (lower) than any of the current # top eleven. if score < lowest_eleven_scores[-1]: i = 0 while score >= lowest_eleven_scores[i]: i +=1 lowest_eleven_scores.insert(i,score) lowest_eleven_scores.pop() # throwing out the smallest lowest_eleven_groups.insert(i,assignments) lowest_eleven_groups.pop() # throwing out the smallest # because for N=2,3 there won't be 11 scores, we trim the lists for score in lowest_eleven_scores: if score == 1000: lowest_eleven_groups.pop() lowest_eleven_scores = lowest_eleven_scores[:len(lowest_eleven_groups)] # finally, we need to make sure the agents who have gone the larger # fraction of the run without switch are labeled as the ingroup. for group in enumerate(lowest_eleven_groups): if group != None: flipped = [abs(l-1) for l in group[1]] old_tot = sum([i*j for i,j in zip(group[1], proportion)]) new_tot = sum([i*j for i,j in zip(flipped, proportion)]) in_size = sum(group[1]) try: if old_tot/float(in_size) < new_tot/float(N - in_size): lowest_eleven_groups[group[0]] = flipped except: if old_tot < new_tot: lowest_eleven_groups[group[0]] = flipped # we return a list of tuples of the assignment and its score return [(i,j) for i,j in zip(lowest_eleven_groups, lowest_eleven_scores)] # CLASSES class Strategy: """This is the class implementing the strategies the agents have. A strategy can be thought of as a table with three columns. The first column is the list of possible input environmental states. The second columnt is the (IID random) output enviromental state for each input state. The third column is the reward associated with each input/output action. The instances don't actually have the first column because the indices of the other two columns serves the same purpose. Strategies have: self.outputs: A list of integers. self.scores: A list of integers. Strategies do: getOutputandReward(): Returns a tuple of an output and reward. """ def __init__(self, num_environs, setupRNG): """Create a new strategy with 'num_environ' environmental states and rewards. CONSTRUCTOR PARAMETERS num_environs: An integer setupRNG: A random number generator. """ self.scores = randomScores(num_environs, setupRNG) self.outputs = randomOutputs(num_environs, setupRNG) def getOutputandReward(self,input_environment): """ Returns the new environment state and the reward associated with the input environment for the strategy. INPUT PARAMETER input_environment: An integer OUTPUT A tuple of integers (output, score) """ return (self.outputs[input_environment], self.scores[input_environment]) class Agent: """ An agent has strategies. One of those strategies is designated as the one currently being played. As the agent encounters the environment, its strategy dicates how it changes the environment and the reward for this action. After acting, the agent calculates which of its strategy would have the highest score had it been played the whole game (i.e. the theoretical score). This is set as the current strategy. Agent Fields: self.strategies: List of objects of the Strategy class self.my_strategy: Integer, indicating the index of the current strategy in the self.strategies list. self.strategy_scores: List of integers for the theoretical scores self.my_score: Integer, total accumulated rewards. self.last_score: Integer, earned last turn. self.tie: String, tie breaking behavior of Strategies self.switched: Float, 0.0 if current strategy didn't change 0.5 if tied last turn 1.0 if the current strategy did just change self.rewards: List of lists of integers, rewards for all strategies for a given environment. It is used to update the hypothetical scores quickly. self.wealth_data: List, heterogenous data types tracking wealth self.output_data: List, heterogenous data types tracking enviros self.switch_data: List,heterogenous data types tracking switches Agent methods: self.Output(env): Given an environment, return the output for that environment of the agent's currently used strategy. self.Score(env): Given an environment, update actual score and also the theoretical scores of the agent's strategies. self.PickStrategy(): Set the currently used strategy to the strategy with the highest theoretical score. In the case of a tie, don't change. self.DoStep(env): Given an environment, give output state, collect reward, update scores, find top strategy. """ def __init__(self, num_envs, num_strats, wndw_sz, setupRNG, runRNG, tie): """ Create an agent with 'num_strats' strategies. CONSTRUCTOR PARAMETERS: num_envs: Integer, number of possible environmental states. num_strats: Integer, number of Strategies agent has. wndw_sz: Integer, length of observation window setupRNG: Random number generator, to generate strategies. runRNG: Random number generator, to run game. tie: string, tie breaking behavior """ # Setting up Strategies, current Strategy, and reward lists self.strategies = [Strategy(num_envs, setupRNG) for i in range(num_strats)] self.my_strategy = setupRNG.randrange(0,num_strats) # Agent starts out having not switched Strategies self.switched = 0.0 self.tie = tie # These loops set up a list of lists with the rewards for all # the Strategies for a given environment. This is faster # than looking at every Strategy every time we need to update. self.rewards = [] for env in range(num_envs): scores = [] for strat in self.strategies: scores.append(strat.scores[env]) self.rewards.append(scores) self.strategy_scores = [0]*num_strats self.my_score, self.last_score, self.my_steps = (0,0,0) self.my_window_score, self.my_window_steps = (-1,-1) self.window_size = wndw_sz # is_filled, reward history, dict of frequency, avg_wealth, std_dev] self.wealth_data = [False, [],{-1:0, 0:0, 1:0},0,0] # is_filled, state history, dict of probabilities, current entropy] self.output_data = [False, [],{i:0 for i in range(num_envs)}, 0] # [time last switched, count switches in window, count strategies # used, frequency of strategies switched to] self.switch_data = [{}, 0, 0, 0, []] def output(self, environment): """ Given an environment, return the output of the agent's current strategy under this envionment. INPUT PARAMETER environment: Integer, current state of the environment. OUTPUTS Integer, output for given input in current Strategy. """ return self.strategies[self.my_strategy].outputs[environment] def score(self, environment): """ Given input environment, update the agent's actual score and theoretical scores of all Strategies. INPUT PARAMETER: environment: Integer, input state of the environment. OUTPUTS: None MODIFIES: self.my_score: Incremented by the current Strategy's reward for the given environment. self.last_reward: Update to reward from this turn. self.strategy_scores: Increment each score by corresponding strategy's reward for the current environment. """ # find reward reward = self.strategies[self.my_strategy].scores[environment] self.last_reward = reward # update actual running score self.my_score += reward # Update theoretical scores for i, val in enumerate(self.rewards[environment]): self.strategy_scores[i] += val def pickStrategy(self, runRNG): """ Set the currently used strategy to the strategy with the highest theoretical score. In the case of a tie, follow 'tie' parameter. INPUT PARAMETER run_RNG: random number generator, for breaking ties OUTPUT: None MODIFIES: self.my_strategy Set to Strategy with highest theoretical score self.switch Records switching behavior for turn """ # Reward of 1 => we know nothing can change because the current # strategy's score just increased. if self.last_reward != 1: # Remember which strategy used to be best. old_best = self.my_strategy # Get list of Strategies which tie for best theoretical score. best_score = max(self.strategy_scores) winners = [ind for ind,val in enumerate(self.strategy_scores) if val == best_score] # If there is a clear winner, pick that Strategy if len(winners): self.my_strategy = winners[0] # Otherwise, depends on tie breaking behavior else: if self.tie == 'guess': self.my_strategy = runRNG.choice(winners) if self.my_strategy == old_best: self.switched = 0.0 else: self.switched = 0.5 elif self.tie == 'stay': if not (self.my_strategy in winners): self.my_strategy = runRNG.choice(winners) self.switched = 1.0 else: self.switched = 0.0 def go(self, environment,runRNG): """ Do a step given the environment encountered; Return the output, update scores, pick Strategy to use. INPUT PARAMETER: environment: Integer, current environmental state. OUTPUT: Integer, a new state for environment. MODIFIES: self.score Adds last reward to self.score self.my_steps Increments by 1 self.my_strategy Via pickStrategy method """ self.score(environment) self.pickStrategy(runRNG) self.my_steps += 1 return self.output(environment) def update_wealth_data(self, new_reward): """ Tracks individual wealth statistics over the course of the run. These data are being tracked over the last self.window_size steps so there is a lot a deleting of old data and adding of new data. INPUT PARAMETER new_reward: Integer, last reward OUTPUT: None MODIFIES: self.wealth_data Updates a whole bunch of stuff in this list """ # This is an awkward way to deal with the first window if self.wealth_data[0] == True: # removing the oldest wealth in the old window oldest = self.wealth_data[1].pop(0) else: oldest = None # adding the last reward to the historical stack self.wealth_data[1].append(new_reward) # checking to see if our data matches the window length now. if len(self.wealth_data[1]) - 1 == self.window_size: self.wealth_data[0] = True # updating running totals. The goal here is to avoid # summing up a whole list. Instead, I track the counts # of each rewards as a fraction of all rewards. inc = round(1/float(self.window_size),6) if self.wealth_data[2][new_reward] < 1: self.wealth_data[2][new_reward] += inc if oldest != None: if self.wealth_data[2][oldest] > 0: self.wealth_data[2][oldest] -= inc # calcuting the average wealth for the window average = 0 for val in self.wealth_data[2]: average += val * self.wealth_data[2][val] self.wealth_data[3] = average # calculating the standard dev. of wealth for window errors = 0 for val in self.wealth_data[2]: errors += ((val - average)**2)*( self.wealth_data[2][val]*self.window_size) self.wealth_data[4] = math.sqrt(errors/(float(self.window_size))) def update_output_data(self, output): """This method allows us to track the history of outputs and averages over a window. INPUT PARAMETER output: Integer, last output environment OUTPUT: None MODIFIES self.output_data Updates a whole bunch of stuff in this list """ if self.output_data[0] == True: oldest = self.output_data[1].pop(0) else: oldest = None # adding the last output environment to the historical stack self.output_data[1].append(output) # checking to see if the list is filled now for the next turn if len(self.output_data[1])-1 == self.window_size: self.output_data[0] = True # updating running totals inc = round(1/float(self.window_size),6) if self.output_data[2][output] < 1: self.output_data[2][output] += inc if oldest != None: if self.output_data[2][oldest] > 0: self.output_data[2][oldest] -= inc if self.output_data[0] == True: self.output_data[3] = GetEntropy(self.output_data[2].values()) def update_switch_data(self, current_time, num_agents): """ This method allows us to track the switching behavior of the agent over the window. INPUT PARAMETERS current_time: Integer, game timestep num_agents: Integer, number of agents OUTPUT: None MODIFIES: self.switch_data Updates a bunch of stuff in this list """ if len(self.switch_data[0]) > 0: strats = {} dels = [] for time in self.switch_data[0]: # check if switch time was in the window. if time + (self.window_size * num_agents) < current_time: dels.append(time) else: strat = self.switch_data[0][time] try: strats[strat] +=1 except: strats[strat] = 0 self.switch_data[1] = time for time in dels: del self.switch_data[0][time] if len(strats) == 0: self.switch_data[2] = 1 self.switch_data[3] = sum(strats.values()) self.switch_data[4] = strats.values() class World: """ A World with an environment and Agents in it. World Fields: self.time: Integer, counts steps taken self.environment: Integer, current environment state self.agents: List of Agent objects self.agent_last_moved: Integer, index of Agent who last acted World Methods: DoStep(): An Agent takes turn, time is incremented. WhoNext(): Pick which Agent takes next turn. reportData(): Returns list of data related to changes after each turn. """ def __init__(self, num_environs, num_agents, num_strats, window_size, start_point, setupRNG, runRNG, tie='guess', order_type='random'): """ Create a World with given number of environments, Agents, and Strategies per Agent. CONSTRUCTOR PARAMETERS: num_environs: Integer, number of environmental states. num_agents: Integer, number of Agents. num_strats: Integer, number of Strategies per Agent. tie: String, behavior for tied Strategies: 'stay' (use same if still tied for best) or 'guess' (pick at random among best). Defaults to guess order_type: String, 'random' or 'fixed' turn order. setupRNG: Random number generator for setting up Agents & Strategies runRNG: Random number generator, runs game """ self.time = 0 self.agent_last_moved = None self.num_agents = num_agents self.order_type = order_type self.runRNG = runRNG self.start_recording_point = start_point - (window_size * num_agents) self.environment = self.runRNG.randrange(0, num_environs) self.last_environ = -1 self.last_switched = [0 for i in range(num_agents)] self.agents = [Agent(num_environs, num_strats, window_size, setupRNG, self.runRNG, tie) for i in range(num_agents)] def whoNext(self): """ Determines which agent will go next. INPUT PARAMETERS None OUTPUT: Integer, index of Agent to move next """ if self.order_type == "random": return self.runRNG.choice(range(len(self.agents))) else: if self.agent_last_moved < len(self.agents)-1: return self.agent_last_moved + 1 else: return 0 def doStep(self): """ Picks an Agent to take a turn, records new environmental state. INPUT PARAMETERS None OUTPUTS None MODIFIES: self.environment New state, determined by acting Agent self.last_environ Old state, for data production purposes Agent.wealth_data See Agent definitions Agent.output_data See Agent definitions Agent.switch_data See Agent definitions """ # Record the starting environment. self.last_environ = self.environment # Choose an agent to go this turn next_agent_index = self.whoNext() self.agent_last_moved = next_agent_index next_agent = self.agents[next_agent_index] # before strategy starting_strat = int(next_agent.my_strategy) # Agent takes turn, outputs new state self.environment = next_agent.go(self.environment, self.runRNG) new_strat = next_agent.my_strategy if starting_strat != new_strat: next_agent.switch_data[0][self.time] = new_strat self.last_switched[next_agent_index] = self.time self.agents[next_agent_index].switch_data[1] = self.time if self.time >= self.start_recording_point: next_agent.update_output_data(self.environment) next_agent.update_wealth_data(next_agent.last_score) next_agent.update_switch_data(self.time, self.num_agents) # Increment time by 1 self.time += 1 def reportData(self): """ Report data about the last move to the runner module. INPUT PARAMETERS: None OUTPUTS: List of data about last time step """ agent = self.agent_last_moved return [self.time, agent, self.agents[agent].my_strategy, self.last_environ, int(self.agents[agent].last_score), self.environment] def finalReport(self, theFile): """ Writes data from the run to a file INPUT PARAMETERS: theFile: A python I\O object to write to disk OUTPUTS: None MODIFIES: A file on disk """ ind_avg_wealths = [agent.my_score/float(agent.my_steps) for agent in self.agents] group_avg_wealth = sum(ind_avg_wealths)/float(len(self.agents)) std_dev = math.sqrt(sum([(ind - group_avg_wealth)**2 for ind in ind_avg_wealths])/float(len(self.agents))) strats = [str(agent.my_strategy) for agent in self.agents] big_list = ['wealths = ' + ','.join([str(i) for i in ind_avg_wealths]), 'strategies = ' + ','.join(strats), 'last_switch = ' + ','.join([str(i) for i in self.last_switched]), 'average wealth = ' + str(group_avg_wealth), 'StdDev_wealth = ' + str(std_dev)] theFile.write("\n".join(big_list)) def finalReport_alt(self, theFile, wealths, counts,twealths,tcounts, playing,enviros, entropy, setupseed, runseed): """A terribly clumsy alternative method for packaging up a bunch of summary information that can be read more easily than the long output files. Sorry I didn't make it pretty. """ ind_avg_wealths = [round(agent.my_score/float( agent.my_steps),5) for agent in self.agents] group_avg_wealth = (sum(ind_avg_wealths) /float(len(self.agents))) group_std_dev = math.sqrt(sum([(ind - group_avg_wealth)**2 for ind in ind_avg_wealths])/float(len(self.agents))) strats = [agent.my_strategy for agent in self.agents] groups = find_groups(self.last_switched,self.time) winner, score = groups[0] scores = [] for gr in groups: scores.append(gr[1]) avg = sum(scores)/float(len(scores)) sumofsq = 0 for sc in scores: sumofsq += (avg - sc)**2 std_dev = (sumofsq/float(len(scores)))**.5 try: robust = (avg-winner[1])/float(std_dev) except: robust = 0 in_group_size = int(sum(winner)) out_group_size = int(len(winner)-in_group_size) window_wealth = [round(i/float(j),5) for i,j in zip(twealths, tcounts)] follow_wealth = [] winner_copy = list(winner) for agent_index, group_mem in enumerate(winner): in_wealth = 0 in_count = 0 out_wealth = 0 out_count = 0 for other_agent, mem in enumerate(winner_copy): if mem == group_mem: in_wealth += wealths[agent_index][other_agent] in_count += counts[agent_index][other_agent] else: out_wealth += wealths[agent_index][other_agent] out_count += counts[agent_index][other_agent] if in_count != 0: inn = round(in_wealth/float(in_count),5) else: inn = 0 if out_count !=0: out = round(out_wealth/float(out_count),5) else: out = 0 follow_wealth. append((inn, out)) if sum(winner) !=0: in_avg = round(sum([i*j for i,j in zip(window_wealth, winner)])/float(sum(winner)),5) else: in_avg = 0 if sum(winner) != len(winner): out_avg = round(sum([i*j for i, j in zip(window_wealth, [abs(k-1) for k in winner])])/ float(len(winner) - sum(winner)),5) else: out_avg = 0 ingroup_envs = [0 for i in range(len(enviros))] outgroup_envs = [0 for i in range(len(enviros))] for agent_ind in range(len(winner)): if winner[agent_ind]: for indx, count in enumerate(entropy[agent_ind]): ingroup_envs[indx] += count else: for indx, count in enumerate(entropy[agent_ind]): outgroup_envs[indx] += count in_evs = float(sum(ingroup_envs)) out_evs = float(sum(outgroup_envs)) total_evs = float(sum(enviros)) ingroup_ent = round(GetEntropy([i/in_evs for i in ingroup_envs]),3) outgroup_ent = round(GetEntropy([i/out_evs for i in outgroup_envs]),3) total_ent = round(GetEntropy([i/total_evs for i in enviros]),3) print "Ingroup ent:",ingroup_ent print "Outgroup ent:", outgroup_ent print "Difference:", ingroup_ent - outgroup_ent print "total entropy:", total_ent big_list = (["group = " + str(winner), 'step_last_switch = ' + str(self.last_switched), 'final_strategies = ' + str(strats), "window_wealth = " + str(window_wealth), "following wealth = " + str(follow_wealth), 'total_wealths = ' + str(ind_avg_wealths), 'strats_playing = ' + str(playing), 'enviro_freq = ' + str(enviros), 'total entropy = ' + str(total_ent), 'group_size (I,O) = ' + str(in_group_size) + "," + str(out_group_size), 'group_wealth (I,O) = ' + str(round(in_avg,5)) + "," + str(round(out_avg,5)), 'whole_wealth (and SD) = ' + str(round(group_avg_wealth,5)) + ',' + str(round(std_dev,5)), 'ingroup, outgroup entropies = ' + (str(ingroup_ent) + "," + str(outgroup_ent)), 'entropy diff = ' + str(ingroup_ent - outgroup_ent), 'setup seed, run seed = ' + str(setupseed) + "," + str(runseed)]) theFile.write("\n".join(big_list))