import os, sys # for the movie ffmpeg command import numpy import Image # used by exporttiles() # insert into file a comment which looks e.g. like this: # highS: 0.099849 lowS: -0.099849 def exportinfo (filename, highS, lowS): f = open(filename, 'rb') content = f.read() f.close() f = open(filename, 'wb') charcount = 0 for char in content: f.write(char) if charcount == 2: f.write('# highS: %.6f lowS: %.6f\n' % (highS, lowS)) charcount += 1 f.close() def exporttiles (X, x, y, a, b, frame, filename): # print filename, ' x, y, a, b, frame=', x, y, a, b, frame Z = numpy.random.uniform (0.0, 0.1, (x*y, a*b)) Z = -X xy, ab = numpy.shape(X) if (xy != x*y) or (ab != a*b): print 'imagetiles: size error' if numpy.min(X) < 0.0: Y = numpy.zeros((frame + x*(a+frame), frame + y*(b+frame))) else: Y = 0.25 * numpy.max(X) * numpy.ones((frame + x*(a+frame), frame + y*(b+frame))) image_id = 0 for xx in range(x): for yy in range(y): if image_id >= xy: break tile = numpy.reshape (Z[image_id], (a, b)) beginA, beginB = frame + xx*(a+frame), frame + yy*(b+frame) Y[beginA : beginA+a, beginB : beginB+b] = tile # print Y[beginA : beginA+a, beginB : beginB+b] image_id += 1 im = Image.new ("L", (frame + y*(b+frame), frame + x*(a+frame))) im.info = 'comment here does not work' im.putdata (Y.reshape((frame + x*(a+frame)) * (frame + y*(b+frame))), offset=-Y.min()*255.0/(Y.max()-Y.min()), scale=255.0/(Y.max()-Y.min()) ) im.save(filename) # seems to ignore the colormap exportinfo (filename, numpy.max(X), numpy.min(X)) class exportgraph: def __init__(self): self.firsttime = 1 self.filename = "" def put (self, val, filename): if self.firsttime == 1: if os.path.exists (filename): os.remove (filename) self.firsttime = 0 f = open(filename, 'a') f.write ('%f\n' % val) f.close() self.filename = filename def plot (self): print 'gnuplot' print 'set style data lines' print 'plot \"' + self.filename + '\"' class movie: def __init__(self): self.files = [] self.counter = 0 def frame (self, X, x, y, a, b, frame, filename): self.filename = filename if self.counter < 10: filecounted = filename + '_000' + str(self.counter) elif self.counter < 100: filecounted = filename + '_00' + str(self.counter) elif self.counter < 1000: filecounted = filename + '_0' + str(self.counter) else: filecounted = filename + '_' + str(self.counter) filecounted_end = filecounted + '.pgm' exporttiles (X, x, y, a, b, frame, filecounted_end) wovon = 'convert ' + filecounted_end + ' ' + filecounted + '.gif' os.system (wovon) os.remove (filecounted_end) # print 'exported ', filecounted_end self.files.append (filecounted) self.counter += 1 def make (self): print 'making movie or animgif', self.filename # wovon = 'ffmpeg -y -sameq -i ' + self.filename + '_%04d.png ' + self.filename wovon = '/informatik/home/weber/bin/gifsicle --delay 3 --loopcount=forever -o ' + self.filename + '_anim.gif ' + self.filename + '_????.gif' os.system (wovon) for fname in self.files: fname_end = fname + '.gif' os.remove (fname_end) # also called softmax activation def rand_winner (S_from, beta): sum = 0.0 p_i = 0.0 rnd = numpy.random.random() d_r = len (S_from) sel = 0 for i in range (d_r): sum += numpy.exp (beta * S_from[i]) S_target = numpy.zeros (d_r) for i in range (d_r): p_i += numpy.exp (beta * S_from[i]) / sum if p_i > rnd: sel = i rnd = 1.1 # out of reach, so the next will not be turned ON return sel class stat: def __init__(self): self.list = [] def sample (self, value): self.list.append(value) def moments (self): a = numpy.array(self.list) mean, variance = 0.0, 0.0 for i in a: mean += i mean /= len(self.list) for i in a: variance += (i - mean)**2 variance /= len(self.list) return ((mean, numpy.sqrt(variance))) class world_model_RL_dashes: def __init__(self, size_a, size_b, len_a, len_b, periodic_boundary): # init input position self.sel_a = numpy.random.randint (0, size_a-1-len_a) self.sel_b = numpy.random.randint (0, size_b-1-len_b) self.size_a = size_a self.size_b = size_b self.len_a = len_a # this is the actual length_a - 1 self.len_b = len_b # this is the actual length_b - 1 self.ind = numpy.arange (0, self.size_a * self.size_b) self.periodic_boundary = periodic_boundary def newinit(self): self.sel_a = numpy.random.randint (0, self.size_a-1-self.len_a) self.sel_b = numpy.random.randint (0, self.size_b-1-self.len_b) def act(self, act): # position world reaction if numpy.random.random_sample() > 0.2: if act == 0: self.sel_a -= 1 elif act == 1: self.sel_b += 1 elif act == 2: self.sel_a += 1 elif act == 3: self.sel_b -= 1 else: print 'action out of bounds' if self.periodic_boundary == 1: if self.sel_a < 0: self.sel_a = self.size_a - 1 # - self.len_a elif self.sel_a >= self.size_a: # - self.len_a: self.sel_a = 0 if self.sel_b < 0: self.sel_b = self.size_b - 1 # - self.len_b elif self.sel_b >= self.size_b: # - self.len_b: self.sel_b = 0 else: if self.sel_a < 0: self.sel_a = 0 elif self.sel_a >= self.size_a - self.len_a: self.sel_a = self.size_a - 1 - self.len_a if self.sel_b < 0: self.sel_b = 0 elif self.sel_b >= self.size_b - self.len_b: self.sel_b = self.size_b - 1 - self.len_b def reward(self): if self.sel_a == (self.size_a-1)/2 and self.sel_b == (self.size_b-1)/2: return 1.0 else: return 0.0 def sensor(self): p = numpy.zeros(self.size_a * self.size_b) if self.periodic_boundary == 1: for aa in range (1 + self.len_a): for bb in range (1 + self.len_b): p[((self.sel_a + aa) % self.size_a) * self.size_a + ((self.sel_b + bb) % self.size_b)] = 1.0 else: for aa in range (1 + self.len_a): for bb in range (1 + self.len_b): p[(self.sel_a + aa) * self.size_a + (self.sel_b + bb)] = 1.0 return p base = "/tmp/coco/" count_data_watch = 0 I_data_watch = [] numpy.random.seed(int(3)) simulation = 'fourlines' # 'points' or 'lines' or 'fourlines' (mind to adjust len_line, size_x/p_a/b) len_line = 1 # 0 if point; 1 or larger if dash periodic_boundary = 1 # if 0, mind that less lines than nxn fit on area; if 1, lines may cross boundaries; consider when choosing size_map non_periodic_boundary = 0 movie1 = movie () movie2 = movie () movie3 = movie () graph = exportgraph () duration_mean = 0 size_x_a, size_x_b = 6, 6 size_x = size_x_a * size_x_b size_p_a, size_p_b = 6, 6 size_p = size_p_a * size_p_b if simulation == 'points' or simulation == 'fourlines': size_I_a = size_x_a + size_p_a elif simulation == 'lines': size_I_a = size_x_a else: print 'Error: simulation mode not determined!' size_I_b = size_x_b size_I = size_I_a * size_I_b size_m_a, size_m_b = 6, 6 size_map = size_m_a * size_m_b size_mot = 4 w_map = numpy.random.uniform (-0.2, 0.1, (size_map, size_I)) w_map = numpy.clip (w_map, 0.0, numpy.inf) w_mot = numpy.random.uniform (0.0, 0.01, (size_mot, size_map)) I_avrg = 0.1 * numpy.ones (size_I) I_sample = numpy.zeros (size_I) m_sample = numpy.zeros (size_map) delta_sample = 0 world_p = world_model_RL_dashes(size_p_a, size_I_b, 0, len_line, periodic_boundary) world_x = world_model_RL_dashes(size_x_a, size_I_b, len_line, 0, periodic_boundary) world_p2 = world_model_RL_dashes(size_p_a, size_I_b, 0, len_line, periodic_boundary) world_x2 = world_model_RL_dashes(size_x_a, size_I_b, len_line, 0, periodic_boundary) beta = 2.0 for iter in range (50000): world_p.newinit() world_x.newinit() world_p2.newinit() world_x2.newinit() r = world_p.reward() # read reward I1 = world_p.sensor() # read old state I2 = world_x.sensor() I3 = world_p2.sensor() # read old state I4 = world_x2.sensor() if simulation == 'points': I = numpy.append (I1, I2) if simulation == 'lines': I = numpy.clip (I1 + I2, 0.0, 1.0) # only clipping at 1.0 is effective! I /= numpy.sqrt(2.0*(len_line+1)) # normalize input if simulation == 'fourlines': Ia = numpy.clip (I1 + I2, 0.0, 1.0) # only clipping at 1.0 is effective! Ia /= numpy.sqrt(2.0*(len_line+1)) # normalize input Ib = numpy.clip (I3 + I4, 0.0, 1.0) # only clipping at 1.0 is effective! Ib /= numpy.sqrt(2.0*(len_line+1)) # normalize input I = numpy.append (Ia, Ib) S = numpy.dot (w_map, I) # init win = numpy.argmax (S) # SOM winner S = numpy.zeros (size_map) # SOM S[win] = 1.0 # SOM h = numpy.dot (w_mot, S) act = rand_winner (h, beta) # choose action act_vec = numpy.zeros (size_mot) act_vec[act] = 1.0 val = 0.0 for i in range (size_mot): val += numpy.dot (w_mot[i], S) # SARSA value duration = 0 while r == 0: world_p.act(act) # do selected action world_x.act(act) # do selected action world_p2.act(act) # do selected action if act randomly: world_p2.act(numpy.random.randint(size_mot)) world_x2.act(act) # do selected action if iter <= 100000: r = world_p.reward() # read reward elif iter <= 200000: r = world_x2.reward() elif iter <= 300000: r = world_p2.reward() else: r = world_x.reward() I_tic1 = world_p.sensor() # read new state I_tic2 = world_x.sensor() I_tic3 = world_p2.sensor() I_tic4 = world_x2.sensor() if simulation == 'points': I_tic = numpy.append (I_tic1, I_tic2) if simulation == 'lines': I_tic = numpy.clip (I_tic1 + I_tic2, 0.0, 1.0) # only clipping at 1.0 is effective! I_tic /= numpy.sqrt(2.0*(len_line+1)) # normalize input if simulation == 'fourlines': Ia = numpy.clip (I_tic1 + I_tic2, 0.0, 1.0) # only clipping at 1.0 is effective! Ia /= numpy.sqrt(2.0*(len_line+1)) # normalize input Ib = numpy.clip (I_tic3 + I_tic4, 0.0, 1.0) # only clipping at 1.0 is effective! Ib /= numpy.sqrt(2.0*(len_line+1)) # normalize input I_tic = numpy.append (Ia, Ib) # count_data_watch += 1 # I_data_watch = numpy.append (I_data_watch, I_tic) # if count_data_watch == 40: # exporttiles (numpy.reshape (I_data_watch, (40, size_I)), 1, 40, size_I_a, size_I_b, 1, base+"obs_v_2_2.pgm") # exit () I_avrg += 0.0001 * (I_tic - I_avrg) # map activation m = numpy.dot (w_map, I_tic) # map feed in win_tic = numpy.argmax (m) # SOM winner m = numpy.zeros (size_map) # SOM m[win_tic] = 1.0 # SOM S_tic = m # action choice h = numpy.dot (w_mot, S_tic) act_tic = rand_winner (h, beta) # choose next action act_vec = numpy.zeros (size_mot) act_vec[act] = 1.0 # value val_tic = 0.0 for i in range (size_mot): val_tic += numpy.dot (w_mot[i], S_tic) # SARSA value delta = r + 0.99 * val_tic - val # prediction error; gamma = 0.9 # RL learning w_mot += 0.03 * delta * numpy.outer (act_vec, S) w_mot -= 0.03 * 0.00003 * w_mot * w_mot**2 # weight decay to maintain re-learnability -- also, without weight decay, error increased after ~75000 iter # map learning like Hebb w_map += 0.01 * numpy.outer (S * delta, I) # / I_avrg) # modulated Hebb if r == 1.0: w_map += 0.01 * numpy.outer (S_tic * delta, I_tic) # / I_avrg) w_map = numpy.clip (w_map, 0.0, numpy.inf) # weight rectification for kk in range (size_map): w_map[kk] = w_map[kk] / numpy.sqrt(numpy.sum(w_map[kk]**2)) # weight normalization # map learning like true Kohonen #w_map[win] += 0.01 * delta * (I - w_map[win]) #if r == 1.0: # w_map[win_tic] += 0.01 * delta * (I_tic - w_map[win_tic]) #w_map = numpy.clip (w_map, 0.0, numpy.inf) # weight rectification (weights would become negative because delta can be negative) #for kk in range (size_map): # w_map[kk] = w_map[kk] / numpy.sqrt(numpy.sum(w_map[kk]**2)) # weight normalization # new becomes old S[0:size_map] = S_tic[0:size_map] val = val_tic act = act_tic I[0:size_I] = I_tic[0:size_I] win = win_tic # for observe duration += 1 if numpy.random.random () < 0.05: I_sample[0:size_I] = I_tic[0:size_I] I_sample += r m_sample[0:size_map] = m[0:size_map] delta_sample = delta duration_mean += duration if iter % 250 == 0: exporttiles (w_map, size_m_a, size_m_b, size_I_a, size_I_b, 1, base+"obs_w_1_0.pgm") exporttiles (w_mot, 1, size_mot, size_m_a, size_m_b, 1, base+"obs_w_2_1.pgm") exporttiles (numpy.reshape (I_avrg, (1, size_I)), 1, 1, size_I_a, size_I_b, 1, base+"obs_I_2.pgm") exporttiles (numpy.reshape (I_sample, (1, size_I)), 1, 1, size_I_a, size_I_b, 1, base+"obs_J_2.pgm") exporttiles (numpy.reshape (m_sample, (1, size_map)), 1, 1, size_m_a, size_m_b, 1, base+"obs_M_2.pgm") exporttiles (numpy.reshape (m, (1, size_map)), 1, 1, size_m_a, size_m_b, 1, base+"obs_M_2.pgm") duration_mean /= 250.0 graph.put (duration_mean, base+"duration.dat") print iter, duration_mean, 'delta=%.2f' % delta, 'delta_sample=%.2f' % delta_sample duration_mean = 0 I_covered = numpy.zeros (size_I) for kk in range (size_map): I_covered[numpy.argmax(w_map[kk])] = 1.0 exporttiles (numpy.reshape (I_covered, (1, size_I)), 1, 1, size_I_a, size_I_b, 1, base+"obs_w_1_1.pgm") I_filled = numpy.zeros (size_I) for kk in range (size_map): I_filled += w_map[kk] exporttiles (numpy.reshape (I_filled, (1, size_I)), 1, 1, size_I_a, size_I_b, 1, base+"obs_v_1_1.pgm") if iter % 500 == 0: movie1.frame (numpy.reshape (I_covered, (1, size_I)), 1, 1, size_I_a, size_I_b, 1, base+"w_cov") movie2.frame (w_mot, 1, size_mot, size_m_a, size_m_b, 1, base+"w_mot") movie3.frame (w_map, size_m_a, size_m_b, size_I_a, size_I_b, 1, base+"w_map") movie1.make () movie2.make () movie3.make () graph.plot ()