Azure: Multithreads en Worker Role, un ejemplo

Published on Author lopezLeave a comment

En mi anterior post, implementé un simple worker role, consumiendo y produciendo números desde y en una cola. Ahora, una nueva aplicación:

El worker role implementa la generación de una secuencia Collatz. Para conocer sobre esta secuencia, problema:

http://mathworld.wolfram.com/CollatzProblem.html
http://en.wikipedia.org/wiki/Collatz_conjecture
http://www.ericr.nl/wondrous/

Pueden bajar la solución desde AjCodeKatas Google project. El código está en la rama:

http://code.google.com/p/ajcodekatas/source/browse/#svn/trunk/Azure/AzureCollatz

La página inicial es simple, sin validaciones:

El rango de números es enviada a la cola:

protected void btnProcess_Click(object sender, EventArgs e)
{
    int from = Convert.ToInt32(txtFromNumber.Text);
    int to = Convert.ToInt32(txtToNumber.Text);
    for (int k=from; k<=to; k++) 
    {
        CloudQueueMessage msg = new CloudQueueMessage(k.ToString());
        WebRole.Instance.NumbersQueue.AddMessage(msg);
    }
}

El worker role toma cada mensaje, y calcula la secuencia Collatz del número recibido en cada mensaje:

Agregué una clase en Azure.Library: un MessageProcessor que consumen mensajes de una cola, en su propio thread:

public MessageProcessor(CloudQueue queue, Func<CloudQueueMessage, bool> process)
{
    this.queue = queue;
    this.process = process;
}
public void Start()
{
    Thread thread = new Thread(new ThreadStart(this.Run));
    thread.Start();
}
public void Run()
{
    while (true)
    {
        try
        {
            CloudQueueMessage msg = this.queue.GetMessage();
            if (this.ProcessMessage(msg))
                this.queue.DeleteMessage(msg);
        }
        catch (Exception ex)
        {
            Trace.WriteLine(ex.Message, "Error");
        }
    }
}
public virtual bool ProcessMessage(CloudQueueMessage msg)
{
    if (msg != null && this.process != null)
        return this.process(msg);
    Trace.WriteLine("Working", "Information");
    Thread.Sleep(10000);
    return false;
}

El worker role, en vez de atender un mensaje a la vez, lanza un número fijo (12) de MessageProcessor. De esta manera, cada instancia de Azure que tengamos disponible se dedica a procesar varios mensajes, en paralelo. No sería necesario para este simple ejemplo. Pero es una “prueba de concepto” para ir viendo de usar esta idea de múltiples threads. Parte del método Run en el worker role:

QueueUtilities qutil = new QueueUtilities(account);
CloudQueue queue = qutil.CreateQueueIfNotExists("numbers");
CloudQueueClient qclient = account.CreateCloudQueueClient();
for (int k=0; k<11; k++) 
{
    CloudQueue q = qclient.GetQueueReference("numbers");
    MessageProcessor p = new MessageProcessor(q, this.ProcessMessage);
    p.Start();
}
MessageProcessor processor = new MessageProcessor(queue, this.ProcessMessage);
processor.Run();

El método ProcessMessage se encarga del trabajo real:

private bool ProcessMessage(CloudQueueMessage msg)
{
    int number = Convert.ToInt32(msg.AsString);
    List<int> numbers = new List<int>() { number };
    while (number > 1)
    {
        if ((number % 2) == 0)
        {
            number = number / 2;
            numbers.Add(number);
        }
        else
        {
            number = number * 3 + 1;
            numbers.Add(number);
        }
    }
    StringBuilder builder = new StringBuilder();
    builder.Append("Result:");
    foreach (int n in numbers)
    {
        builder.Append(" ");
        builder.Append(n);
    }
    Trace.WriteLine(builder.ToString(), "Information");
    return true;
}

Próximos pasos: más aplicaciones distribuidas (algoritmo genético, web crawler, etc…). También podría mejorar el ejemplo para conseguir ver los resultados en una página web, o lanzar y consultar resultados de forma remota, desde un WinForm, por ejemplo.

Nos leemos!

Angel “Java” Lopez

http://www.ajlopez.com

http://twitter.com/ajlopez

Leave a Reply

Your email address will not be published. Required fields are marked *