meine Ruby-Artikel

Disko - eine Klasse zum Verwalten von Threads

Ich habe ein Programm, welches gleichartige Aufgaben auf mehrere Threads verteilt. So weit funktionierte das auch ganz gut. Ich war allerdings nicht ganz zufrieden, daß ich die ganze Verwaltung der verschiedenen Threads in meinem Anwendungsprogramm machen sollte. Ich wollte beispielsweise haben, daß z.B. 10 Aufgaben auf Threads verteilt werden sollten, jedoch nur maximal 3 Threads gleichzeitig aktiv sind.

    1	class Disko
    2	
    3	  attr_reader :maximum
    4	
    5	
    6	  def initialize(maximum)
    7	    # Fassungsvermoegen
    8	    @maximum = maximum
    9	    @running = Array.new
   10	    @warteschlange = Array.new
   11	    @threads = [] 
   12	    @thread_names = []
   13	
   14	  end
   15	
   16	  #def <<(o)
   17	  #  @warteschlange << o
   18	  #end
   19	  def push o
   20	    @warteschlange.push(o)
   21	  end
   22	
   23	  def to_s
   24	    @running.to_s+"\n"+
   25	    @warteschlange.to_s
   26	  end
   27	
   28	  # add a new item 
   29	  def add(item)
   30	    @warteschlange.push(item)
   31	  end
   32	
   33	  # show waiting & running array
   34	  def show
   35	    s=""
   36	    @running.each {|r|
   37	      s << r.to_s << " "
   38	    }
   39	    s << "\n"
   40	    @warteschlange.each {|r|
   41	      s << r.to_s << " "
   42	    }
   43	    s
   44	  end
   45	
   46	  def start_thread(item)
   47	    print "start_thread mit"
   48	    p item
   49	    @thread_names << item[0]
   50	    #begin
   51	      @threads << Thread.new(item) do |item|
   52	        site_name = item[0]
   53	        @priority = item[1]
   54	        `mkdir logs_#{site_name} 2>/dev/null`
   55	        p site_name
   56	        start = Time.now
   57	        start_meldung site_name, start
   58	        # puts "lade: test_"+site_name+".rb"
   59	        begin
   60	          load "test_"+site_name+".rb", true
   61	        rescue
   62	          puts "Fehler #{site_name} aufgetreten (OUT OF BUFFER SPACE?)"
   63	        end
   64	        ende = Time.now
   65	        ende_meldung site_name, ende
   66	        puts "Laufzeit: "+duration(start,ende)
   67	        datum = Time.now.strftime("%Y-%m-%d")
   68	        log_done(site_name,datum)
   69	      end # Thread
   70	    #rescue
   71	    #  puts "Fehler in Thread aufgetreten"
   72	    #end
   73	  end
   74	
   75	  def running_tasks 
   76	    count=0
   77	    @threads.each {|thr|
   78	      count += 1 if thr.alive?
   79	    }
   80	    count
   81	  end
   82	
   83	  def open
   84	    p @warteschlange
   85	    while running_tasks < @maximum and
   86	          @warteschlange.size > 0
   87	      item = @warteschlange.delete_at(0)
   88	      puts "lasse rein: "
   89	      p item
   90	      start_thread(item)
   91	    end
   92	    puts "open beendet"
   93	  end
   94	
   95	  def run
   96	    # Schleife, solange noch welche in Warteschleife oder laufende noch nicht beendet
   97	    while @warteschlange.size > 0 or
   98	          running_tasks > 0
   99	      puts "hier ist run..."
  100	      if running_tasks < @maximum and
  101	         @warteschlange.size > 0
  102	        item = @warteschlange.delete_at(0)
  103	        puts "lasse rein: "
  104	        p item
  105	        start_thread(item)
  106	      end
  107	      puts "schlafe"
  108	      puts "drin:"
  109	      pp @threads
  110	      @threads.each_with_index {|thr, i|
  111	        if thr.alive?
  112	          puts @thread_names[i]+" "+thr.priority.to_s
  113	        end
  114	      }
  115	
  116	      puts "diese warten noch vor der Tuer:"
  117	      p @warteschlange
  118	      sleep 30
  119	    end 
  120	  end
  121	
  122	  def close
  123	    puts "close"
  124	    puts " Auf Beenden aller Threads warten"
  125	    @threads.each {|thr| thr.join }
  126	    puts "alle Threads sind beendet"
  127	  end
  128	
  129	end # Disko

Die Verwendung der Klasse sieht folgendermaßen aus: dies ist nur ein Ausschnitt aus dem Anwendungsprogramm, der die Verwendung demonstrieren soll.
disko = Disko.new(3)

puts "running tasks: #{running_tasks(threads)}"
SITES_DEVELOPMENT.each {|site|
  site_name = site[0]
  if schon_gelaufen?(site_name,force_tests)
    puts "#{site_name} schon gelaufen"
  else
    # queue
    disko.add(site)
  end
}
disko.open
disko.run
disko.close # wartet auf Beendigungg aller Tasks
Der Aufruf 'disko = Disko.new(3)' erzeugt ein neues Disko-Objekt, die maximale Anzahl von parallel laufenden Threads wird auf drei begrenzt. Diese Anzahl kann derzeit zwar abgefragt, aber nachträglich nicht mehr geändert werden.

Jetzt brauchen wir nicht mehr zu tun, als alle zu startenden Tasks einfach dem Disko-Ojekt zu übergeben. Die weitere Abarbeitung erfolgt dort automatisch. Dies geschieht mit dem Aufruf 'disko.add(site)'. Hier können prinzipiell alle Arten von Objekten übergeben werden - derzeit muß die Lauf-Methode aber auch noch daraufhin angepaßt werden, hier ist noch ein bißchen Entwicklungsarbeit nötig, falls man diese Klasse wirklich allgemeingültig verwenden will. Momentan ist die Lauf-Methode auf meine Zwecke (automatisiertes Website-Testen ausgelegt). Jetzt haben wir in der Schleife alle gewünschten Objekte an Disko übergeben. Jetzt noch "disko.open" aufrufen, welches die Disko eröffnet. Dann "disko.run" aufrufen, welche alle wartenden Objekte startet, aber jeweils nur z.B. 3 gleichzeitig und darauf wartet, daß alle alle wartendenden Ojekt ihre Aktionen durchführen. "disko.close" wiederum schließt die Disko und wartet darauf, da&szlg; alle noch laufenden Threads beendet werden. Man sieht, die Behandlung im Hauuptprogramm wird wesentlich übersichtlicher, wenn man sich um die detaillierte Steuerung von Threads nicht kümmern muß, sondern dies einfach delegiert. Ach ja: Der Name Disko dürfte klar sein, oder? Im Gegensatz zu einer realen Disko, kommt hier aber jedes Objekt garantiert rein und nach dem 'close' der Disko wird auch garantiert noch so lange gewartet, bis das letzte Objekt seine Aktionen beendet hat.

Referenzen

Die Disko-Klasse wird in folgenden Projekten verwendet: