ASP.NET e RabbitMQ (con un po' di Node.js)

di Andrea Zani, in .NET,

Cercherò di essere più breve. Nel post precedente ho fatto una lunga discussione sul mondo dei message broker e dei microservice. Avevo affrontato l'argomento con RabbitMQ, ma tutto quanto avevo scritto era riutilizzabile con qualsiasi altro message broker; i vantaggi di quello scelto da me è che, essendo multipiattaforma, è utilizzabile su qualsiasi sistema operativo o tecnologia. Avevo affrontato varie metodologie di utilizzo partendo con esempi semplici fino a esagerare con il quicksort via message broker. I più semplici esempi si basavano su una console application che, popolando una queue in RabbitMQ, permetteva ad una altra console application di prelevare quel messaggio ed elaborarlo (visualizzazione e poco altro nei miei esempi). Negli esempi un poco più avanzati una console application inviava un messaggio richiedendo un'elaborazione più complessa (un calcolo matematico); questo veniva prelevato ed elaborato da un'altra console application che, questa volta, invece di visualizzare il risultato e terminare l'elaborazione, inviava al message broker la risposta che veniva poi presa dal primo programma per la visualizzazione del risultato. Ecco... ci siamo... questo è il punto cruciale. Alla fine è quello che vogliamo nel 90% delle nostre necessità: un nostro programma, o una nostra pagina di una web application, richiede dei dati che, una volta ricevuti, devono essere visualizzati.

Se in una console application questo è banale per via degli eventi indipendenti per l'attesa della risposta remota, in una web application come dovremmo comportarci? Prima di arrivare al punto cruciale di questo post, vediamo una divagazione tecnologica che ci introduce all'argomento. Come scritto nel post precedente, avevo iniziato a interessarmi di message broker con Node.js. Questa tecnologia abbastanza recente - 2009 - ha preso velocemente piede anche grazie alle sue prestazioni superiori a quelle che allora andavano per la maggiore (dall'asp.net, al php e così via). Ha una particolarità tutta sua che la differenziava da qualsiasi altra tecnologia presente e usata fino a quel momento, la sua natura asincrona. Mentre il mondo si spostava sul parallelismo sfruttando la potenza delle macchine, ecco che node.js si inventa la ruota e sembra fare un salto all'indietro visto che la sua natura è strettamente single thread. La differenza sta che tutto il codice dev'essere scritto sfruttando chiamate asincrone con tanto di callback - si hanno presente le chiamate ajax? Ecco, node.js spinge quello che siamo abituati lato client anche lato server; quello che facciamo per chiamare il contenuto di una chiamata remota con ajax, lo dobbiamo fare per richiedere i dati a un database, per invocare servizi remoti e così via. E come può, dunque, node.js rispondere a chiamate parallele e avere prestazioni così mirabolanti? Semplicemente perché grazie alla sua natura single thread non deve preoccuparsi di lock e altro multiprocesso; inoltre la sua natura asincrona spinge, anzi, obbliga, a non scrivere codice che rimanga in attesa di una risposta esterna sprecando cicli macchina e risorse. Una nostra pagina web scritta in node.js necessita di una richiesta ad un database per visualizzare una tabella in una pagina; se avvengono due richieste pressoché in parallelo, in single thread di node.js inizia ad elaborare la pagina, effettua la richiesta al database e, senza aspettare la risposta del database, inizia la seconda richiesta che si bloccherà al momento della richiesta al database. Quando il database ritornerà i dati, ecco che node.js continuerà l'elaborazione della prima pagina e poi della seconda. Se poi su macchine multicore possiamo avviare più processi in parallelo di node.js in modo semplicissimo e i processi saranno completamente indipendenti, ci possiamo rendere conto delle prestazioni che si possono raggiungere. Si ricorda dell'esempio del mio post precedente sul modo di richiamare delle tabelle di un database da asp.net?

var posts = BizEntity.GetPostList();
var tags = BizEntity.GetTagList();

In node.js potrebbe essere (con il pattern promise che in node.js va per la maggiore):

var postQuery=queryPosts();
var tagsQuery=queryTags();
Q.all([postQuery, tagsQuery],function(results) {?});

Ecco, questo dimostra, se si è compreso dalle mie parole come lavora node.js, perché asp.net a confronto spreca, sorry, butta risorse inutilmente - per fortuna con async/await la storia è cambiata. Dopo questo mini corso su node.js per chi ne era completamente digiuno (gli altri sorrideranno per il livello bassissimo scelto) vediamo come è semplice sfruttare con node.js rabbitmq per richiamare un microservice remoto - mi ripeto: che può essere sulla stessa macchina o dalla parte opposta del pianeta, importante è che entrambi siano raggiungibili dallo stesso message broker.

Prima di tutto scriviamo un microservice in c# (come console application) che, sfruttando RabbitMQ, crea una queue e una exchange che saranno utilizzabili per richiedere una somma tra numeri interi (se non si sa cosa è una queue e una exchange, si riveda il post precedente).

Ecco il codice:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Web.Script.Serialization;

namespace MicroserviceAddiction
{
    class Program
    {
        const string ExchangeName = "ExchangeIntegerAddition";
        const string QueueName = "QueueIntegerAddition";
        static void Main(string[] args)
        {
            var jsonSerializer = new JavaScriptSerializer();
            var connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";

            using (var Connection = connectionFactory.CreateConnection())
            {
                var ModelCentralized = Connection.CreateModel();
                ModelCentralized.QueueDeclare(QueueName, false, false, true, null);
                ModelCentralized.ExchangeDeclare(ExchangeName, ExchangeType.Direct, false, true, null);
                ModelCentralized.QueueBind(QueueName, ExchangeName, "");
                // ModelCentralized.BasicQos(0, 1, false);

                QueueingBasicConsumer consumer = new QueueingBasicConsumer(ModelCentralized);
                string consumerTag = ModelCentralized.BasicConsume(QueueName, false, consumer);

                Console.WriteLine("Wait incoming addition...");

                while (true)
                {
                    var e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    string replyQueue = props.ReplyTo;
                    string correlationId = props.CorrelationId;
                    string messageId = props.MessageId ?? "";

                    string content = Encoding.Default.GetString(e.Body);
                    Console.WriteLine("> {0}", content);

                    var calculationObj = jsonSerializer.Deserialize<AdditionServiceClass>(content);

                    calculationObj.Total = calculationObj.Number1 + calculationObj.Number2;
                    var resultJSON = jsonSerializer.Serialize(calculationObj);

#if(DEBUG)
                    Thread.Sleep(5000);
#endif

                    Console.WriteLine("< {0}", resultJSON);
                    var msgRaw = Encoding.Default.GetBytes(resultJSON);
                    IBasicProperties basicProperties = ModelCentralized.CreateBasicProperties();
                    basicProperties.CorrelationId = correlationId;
                    basicProperties.MessageId = messageId;
                    ModelCentralized.BasicPublish("", replyQueue, basicProperties, msgRaw);

                    ModelCentralized.BasicAck(e.DeliveryTag, false);
                }
            }
        }
    }
}

Se si è letto il post precedente si riconoscerà il codice usato più volte. Si crea una exchange dal nome "ExchangeIntegerAddition" e una queue dove il codice preleverà l'oggetto "AddiotionalServiceClass" in formato JSON e lo deserializzerà in modo che il codice possa gestire questo oggetto in modo nativo:

    public class AdditionServiceClass
    {
        public int Number1 { get; set; }
        public int Number2 { get; set; }
        public int Total { get; set; }
    }

Il tutto si basa su tre righe di codice:

                    var calculationObj = jsonSerializer.Deserialize<AdditionServiceClass>(content);

                    calculationObj.Total = calculationObj.Number1 + calculationObj.Number2;
                    var resultJSON = jsonSerializer.Serialize(calculationObj);

Presi dall'oggetto inviato dal message broker come la replyQueue (per avere il nome della queue dove inviare la risposta) e i riferimenti del messaggio (MessageId e CorrelationId utilizzati dal processo chiamante per identificare la risposta), si invia la risposta nel modo già affrontato. Fine. Per un argomento che tratteremo a breve, ho inserito, solo per la compilazione in debug, uno sleep di cinque secondi.

Anche se l'ho già spiegato, ho preferito utilizzare il formato JSON perché questo mi permette l'interoperabilità che mi consente ora di scrivere una web application che, una volta richiesta la pagina, richiamerà questo servizio, e ricevuta la risposta la visualizzerà nel schermo. Innanzitutto si deve preparare il tutto con poche righe di codice. Avendo già sulla propria macchina installato node.js e npm (possibilmente una delle ultime versioni), aprendo il terminale possiamo creare una directory configurare la base della nostra web application con il comando:

npm init

Rispondendo alle varie domande avremo sarà creato il file package.json come il seguente:

{

  "name": "simpletest1",

  "version": "0.0.1",

  "description": "Simple example about comunication from nodejs to rabbitmq",

  "main": "index.js",

  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },

  "author": "AZ",

  "license": "ISC",

  "dependencies": {

    "amqplib": "^0.4.1",

    "express": "^4.13.4",

    "node-uuid": "^1.4.7"

  }
}

"dependencies" inizialmente sarà vuoto, da terminale scriviamo:

npm install --save express
npm install --save amqplib
npm install --save node-uuid

Il comandi npm installerà le dipendenze di cui abbiamo bisogno... Che strano: questa procedura mi ricorda qualcosa... ah sì, la stessa adottata dalla nuova versione di asp.net vnext. Sterile polemica a parte, ora abbiamo tutto quello che ci serve. Scriviamo lo scheletro della nostra web application:

var express = require('express');
var amqp = require('amqplib/callback_api');
var uuid = require('node-uuid');
var app = express();

// codice per l'attesa di RabbiMQ

app.get('/', function (req, res) {
// code
});

// Handle 404
app.use(function(req, res) {
 res.status(404).send('404: Page not Found');
});

// Handle 500
app.use(function(error, req, res, next) {
 res.status(500).send('500: Internal Server Error');
});

var server = app.listen(8001, function () {

  var host = server.address().address
  var port = server.address().port

  console.log("Example app listening at http://%s:%s", host, port)

})

Le prime tre righe caricano le dipendenze necessarie. Express è utilizzato per facilitare l'esposizione e il routing delle nostre pagine, amqplib per l'accesso al message broker RabbitMQ, node-uuid per la creazione di guid univoche. Creata l'istanza all'oggetto app in express, possiamo definire le regole per il routing. Definendo la regola come nel modo seguente:

app.get('/', function (req, res) {
// code
// example to show message
req.end("Hello World!");
});

Facciamo in modo che, in causa di richiesta del nostro sito con il path principale, venga eleborata la funzione specificata. Di seguito sono definite le regole di routing in caso di pagina non trovata (errore 404) o di errore nel nostro codice (errore 500). Infine con "server" avviamo effettivamente il server per l'elaborazione delle pagine, nell'esempio qui sopra alla porta 8001. Una volta avviata questa web application con:

node index.js

E richiamando da browser la pagina "http://localhost:8001" sarà elaborata la funzione prima definita nel routing - e se è usato l'ultimo codice, sarà visualizzato il messaggio "Hello World!". E' arrivato il momento di inserire il codice per interfacciarsi con RabbitMQ. Vediamo come inviare la richiesta a nostro microservice che esegue il calcolo:

app.get('/', function (req, res) {
  var objToRequest = {Number1:1, Number2:2};
  var stringToRequest= JSON.stringify(objToRequest);
  var guid = uuid.v4();
  console.log("********* Request: "+stringToRequest+" id: "+guid);
  ch.publish('ExchangeIntegerAddition', '', new Buffer(stringToRequest),
    { correlationId: guid, replyTo: q.queue });  

  cacheRequest[guid]=res;
});

Viene creato un oggetto JSON con i numeri da sommare (si noti che rappresenta la classe AdditionServiceClass in C#). Creato un guid per identificare la chiamata è usato un oggetto "ch" (lo vedremo tra pochissimo come è creato) e con una singola funzione viene inviato il messaggio all'exchange dove il microservice è in attesa, e si aggiungono info sulla queue di reply e l'identificatore della chiamata. Infine è salvato l'oggetto di node.js "res", per inviare la risposta, in un array legato al guid. Ecco la potenza di node.js: grazie alla sua natura asincrona, possiamo salvare gli oggetti della risposta e eseguirli quando vogliamo; nel nostro caso quando sarebbe utile? Semplice, quando il microservice risponde alla richiesta e invia alla nostra queue private predestinata a questo scopo la risposta, possiamo visualizzarla. Ecco il codice:

var ch, q;
var cacheRequest={};

amqp.connect('amqp://localhost', function(err, conn) {
    if (err) {
    console.log("********************");
      console.log(err);
      console.log("Errore nella connessione!");
      return;
    }
  conn.createChannel(function(err, channel) {
    ch=channel;
      ch.assertQueue('', {exclusive: true, autoDelete:true}, function(err, queuex) {
        q=queuex;
          ch.consume(q.queue, function(msg) {

      var guid = msg.properties.correlationId;
      var objResult = JSON.parse(msg.content.toString());
      console.log("From RabbitMQ");
      var request = cacheRequest[guid];
      cacheRequest[guid] = null;
      request.end( "Result sum (1+2): "+objResult.Total);

        }, {noAck: true});
        
      });
  });
});

Il codice è facilmente intuibile: aperta una connessione, è creato un channel e una queue private che sarà utilizzata per la ricezione delle riposte (ricordo che una queue private permette la lettura solo dal processo che l'ha creata ma l'inserimento dei messaggi da chiunque). Infine la funzione asincrona "ch.console(...)" attende i messaggi dalla queue. Quando arriva, nel messaggio sarà letto il codice identificativo della chiamata (guid), letta la risposta nel messaggio (essendo in formato JSON diventa tutto più semplice) quindi si riprende dall'array cacheRequest l'oggetto per che ci connette al client, e inviamo definitivamente la risposta. Con poche decine di righe di codice abbiamo tutto quanto di cui abbiamo bisogno: eccezionale.

Prima di tornare nel mondo di asp.net, voglio divagare ancora un po'. Si ricorderà che nel microservice scritto in C# è stato inserito un ritardo di cinque secondi per simulare un'elaborazione pesante e/o ritardi di comunicazione. Che cosa accadrebbe se questa pagina è chiamata più volte? Ogni pagina invierà la richiesta al message broker che poi la invierà al microservice che, a sua volta, risponderà percorrendo tutta la strada a ritroso. Poco efficiente: immaginiamo che questa pagina (o questo tipo di richiesta) avvenga spesso! Primo approccio per risolvere: cache. Niente di difficile: quando riceviamo la risposta la salviamo in modo che le successive richieste inviino direttamente la risposta. Ecco il codice (indexCache.js):

var ch, q;
var cacheRequest={};
var cacheResult={};

amqp.connect('amqp://localhost', function(err, conn) {
    if (err) {
    console.log("********************");
      console.log(err);
      console.log("Errore nella connessione!");
      return;
    }
  conn.createChannel(function(err, channel) {
    ch=channel;
      ch.assertQueue('', {exclusive: true, autoDelete:true}, function(err, queuex) {
        q=queuex;
          ch.consume(q.queue, function(msg) {

      var guid = msg.properties.correlationId;
      var stringToRequest = msg.properties.messageId;
      var objResult = JSON.parse(msg.content.toString());
      cacheResult[stringToRequest]=objResult;
      console.log("From RabbitMQ");
      var request = cacheRequest[guid];
      cacheRequest[guid] = null;
      request.end( "Result sum (1+2): "+objResult.Total);

        }, {noAck: true});
        
      });
  });
});

...

app.get('/', function (req, res) {
  var objToRequest = {Number1:1, Number2:2};
  var stringToRequest= JSON.stringify(objToRequest);

  if (cacheResult[stringToRequest]) {
    console.log("From cache");
    return res.end( "Result sum (1+2): "+cacheResult[stringToRequest].Total);
  }

  var guid = uuid.v4();
  console.log("********* Request: "+stringToRequest+" id: "+guid);
  ch.publish('ExchangeIntegerAddition', '', new Buffer(stringToRequest),
    { messageId: stringToRequest, correlationId: guid, replyTo: q.queue });  

  cacheRequest[guid]=res;
});

E’ utilizzato un nuovo oggetto cacheResult che salva il contenuto della risposta. Se proviamo ora, solo la prima richiesta avrà una risposta dopo cinque secondi: tutte le successive sono immediate. C'è motivo per essere felici? No, perché cosa avviene tra la prima richiesta e quelle che avvengono fino alla risposta ben cinque secondi dopo? Semplicemente saranno eseguite n richieste fino al salvataggio nella cache della risposta. Semplificando con un grafico:

E se mettessimo in un buffer le richieste in modo che, appena ricevuta la risposta, la potessimo inviare a tutti i client?

Ecco che il primo utente fa partire la richiesta effettiva al microservice, se le successive richiedono gli stessi dati, rimane in attesa della risposta della prima richiesta. Con node.js? Ancora tutto facile (indexbatch.js):

var ch, q;
var cacheRequest={};

amqp.connect('amqp://localhost', function(err, conn) {
    if (err) {
    console.log("********************");
      console.log(err);
      console.log("Errore nella connessione!");
      return;
    }
  conn.createChannel(function(err, channel) {
    ch=channel;
      ch.assertQueue('', {exclusive: true, autoDelete:true}, function(err, queuex) {
        q=queuex;
          ch.consume(q.queue, function(msg) {

      var stringToRequest = msg.properties.correlationId;
      var objResult = JSON.parse(msg.content.toString());
      console.log("From RabbitMQ");
      var requestList = cacheRequest[stringToRequest];
      cacheRequest[stringToRequest] = null;
      requestList.forEach(function(res) {
        res.end( "Result sum (1+2): "+objResult.Total);
      });

        }, {noAck: true});
        
      });
  });
});

app.get('/', function (req, res) {
  var objToRequest = {Number1:1, Number2:2};
  var stringToRequest= JSON.stringify(objToRequest);
  console.log("********* Request: "+stringToRequest);

  if (!cacheRequest[stringToRequest]) {
    console.log("Request new");
    cacheRequest[stringToRequest]=[];
    ch.publish('ExchangeIntegerAddition', '', new Buffer(stringToRequest),
      { correlationId: stringToRequest, replyTo: q.queue });  
}
  else {
    console.log("Request cached");
  }
  cacheRequest[stringToRequest].push(res);
});

In requestList salviamo tutte le richieste "uguali". Quando riceviamo la risposta ecco che inviamo immediatamente le risposte a tutti i client:

      requestList.forEach(function(res) {
        res.end( "Result sum (1+2): "+objResult.Total);
      });

Se apriamo due browser diversi (non valgono tab differenti) e richiediamo la pagina, vedremo, grazie ai messaggi di log nel terminale, che la prima richiesta viene inviata effettivamente al message broker, mentre la seconda chiamata rimane in attesa; l'effettivo funzionamento corretto lo si verifica facilmente vedendo che entrambi i browser si aggiornano contemporaneamente.

Fine divagazione con node.js. Ora tocca ad asp.net. Facile, replichiamo lo stesso funzionamento: mettiamo in cache le richieste e quando abbiamo la risposta dal message broker le visulizziamo... asp... qualcosa non mi torna... pausa... qualcosa non torna in questo ragionamento... Alla fine mi rendo conto che... NON SI PUO'! Asp.net, almeno basandosi sulle mie conoscenze, non permette una cosa del genere! E che... come risolvo? Ok, la faccio breve... Ricapitoliamo togliendoci dalla testa node.js: asp.net è un'altra cosa. Devo fare in modo che, inviato un messaggio a un message broker alla richiesta di pagina, dobbiamo attendere la risposta perché essa sia visualizzata.

Non esiste nulla di tutto ciò. Dobbiamo fare qualcosa con ciò che il framework .net ci mette a disposizione. Mi ritorna in mente un oggetto nativo che avevo utilizzato per la distribuzione di una queue tra più thread: BlockingCollection. Questo oggetto nel namespace System.Collection.Concurrent fa parte di una serie di nuovi oggetti creati da Microsoft per l'accesso sicuro a collection in multithreading - i canonici oggetti List<>, Dictionary<> ecc... se utilizzati in multithreading necessitano di lock da parte del programmatore. ConcurrentBag<>, ConcurrentDictionary<> e così via, permettono l'accesso sicuro senza bisogno di lock, e funzionano come i tradizionali oggetti:

                    var queue1 = new System.Collections.Generic.Queue<string>();
                    queue1.Enqueue("stringa");
                    string value = queue1.Dequeue();

Questo è la classica queue presente nel Framework .net fin dall'arrivo dei generics nella versione 2.0. Aggiunta una o più stringhe possiamo preleverane una con il classico ordinamento FIFO (first in, first out). Se utilizziamo questo oggetto in un'ambiente multithread si rischiano strana inconsistenza negli oggetti ritornati. Con i nuovi oggetti a prova di "concorrenza" possiamo scrivere in un esempio un pochino più complesso:

            var queueBlocked = new System.Collections.Concurrent.ConcurrentQueue<string>();
            queueBlocked.Enqueue("stringa1");

            while (true)
            {
                string value;
                if (queueBlocked.TryDequeue(out value))
                {
                    Console.WriteLine(value);
                }
                else
                {
                    break;
                }
            }

Ora dobbiamo "provare" a prelevare il valore, in caso positivo possiamo accedere all'oggetto senza problemi altrimenti la queue è vuota e il programma si chiude. A parte questi oggetti basilari, c'è l'oggetto nominato prima: BlockingCollection.

            var queueBlocked = new System.Collections.Concurrent.BlockingCollection<string>();
            queueBlocked.Add("stringa1");

            while (true)
            {
                string value = queueBlocked.Take();
                Console.WriteLine(value);
            }

Questa volta il ciclo è infinito ma non dobbiamo preoccuparci: appena estratti tutti gli elementi dalla queue il thread si bloccherà in attesa che un secondo thread aggiunga un altro elemento. Questo lo farà senza sprecare cicli macchina o altro: il thread sarà sospeso in attesa della prossima elaborazione. In passato ho utilizzato questo oggetto con profitto per l'elaborazione di elementi da una queue da parte di più thread paralleli senza dovermi preoccupare di lock e altro. Ma la cosa importante ora è che BlockingColleciton mi consente di avere a disposizione una possibile soluzione. Se io passassi a una classe (che invia e attende un messaggio dal message broker) questo oggetto, e ne rimanessi in attesa che venga popolato con la risposta, avrei la mia soluzione valida che mi consentirebbe di scrivere:

protected void Page_Load(object sender, EventArgs e)
{
   var objToRequest = new AdditionServiceClass { Number1=1, Number2=2 };
   var blockedObj = new BlockingCollection<objToRequest>();
   SendMessage(blockedObj); // <-- example
   msg.Text = blockedObj.Take().ToString(); // <-- simple example
}

Faccio alcune prove e la cosa funziona, e pure bene: la mia classe, che vedremo dopo, che gestisce i messaggi da inviare e ricevere al message broker, è in grado di mettere in cache le richieste mentre invia solo una richiesta al microservice, e ricevuta la risposta, la invia a tutte le pagine in attesa che visualizzano contemporaneamente la risposta.

Tutto bene? No, questo oggetto ha un grave difetto da numerosi test che ho fatto anche personalmente: mette in sospensione sì il thread, ma non lo libera per altre richieste come è auspicabile che faccia in una web application in asp.net. E siamo ancora al punto di prima. Ci vorrebbe un oggetto che, come il BlockingCollection sia in grado di bloccare il thread e addirittura sia capace di liberarlo e, perché no, sia compatibile con il nuovo sistema asincrono di asp.net con l'uso di async/await. Senza doverlo ricreare da zero, avevo già cercato in precedenza un oggetto simile e l'ho trovato in una libreria esterna: "AsyncEx"

https://github.com/StephenCleary/AsyncEx

L'oggetto in questione è "BufferBlock". Come ConcurrentQueue, possiamo usarlo nello stesso modo:

BufferBlock<string> bb = new BufferBlock<string>();
string value;
try
{
    value = await bb.ReceiveAsync<string>((new CancellationTokenSource(3000)).Token);
}
catch
{
    value = "Timeout!!!";
}

Possiamo anche definire il CancellationTokenSource con il timeout: se nel tempo prestabilito non è inserito nessun oggetto, avremo una exception che potremo gestire in modo che una pagina non rimanga in attesa a tempo indeterminato una risposta che non potrebbe mai arrivare.

Alla fine ce l'abbiamo fatta: abbiamo un oggetto utile allo scopo e possiamo scrivere la nostra pagina:

<%@ Page Language="C#" AutoEventWireup="true" CodeBehind="TestRabbitMQ.aspx.cs" Inherits="AsyncPageTest.TestRabbitMQ" Async="true" %>

<!DOCTYPE html>

<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
    <title></title>
</head>
<body>
    <form id="form1" runat="server">
    <div>
        Message from RabbitMQ: <asp:Label ID="msg" runat="server" />
    </div>
    </form>
</body>
</html>

E il codice in C#:


        protected async void Page_Load(object sender, EventArgs e)
        {
            AdditionServiceClass value;

            var objToRequest = new AdditionServiceClass { Number1=5, Number2=2 };

            try
            {
                value = await RabbitUtilityClass.SendRequest(objToRequest);
                msg.Text = value.Total.ToString();
            }
            catch
            {
                msg.Text = "Timeout";
            }
        }

Il codice è semplicissimo: AddiotionalServiceClass ha tre property dove sono inseriti i due valori numerici da aggiungere e il risultato. Il lavoro più interessante è la classe RabbitUtilityClass. Ecco il codice completo:

    public class RabbitUtilityClass
    {
        const string ExchangeName = "ExchangeIntegerAddition";

        static ConnectionFactory _connectionFactory;
        static IConnection _connection;
        static IModel _modelCentralized;
        static string _queueName;

        static ConcurrentDictionary<string, List<BufferBlock<AdditionServiceClass>>> _cacheRequest = new ConcurrentDictionary<string, List<BufferBlock<AdditionServiceClass>>>();
        static JavaScriptSerializer jsonSerializer = new JavaScriptSerializer();

        static RabbitUtilityClass()
        {
            _connectionFactory = new ConnectionFactory();
            _connectionFactory.HostName = "localhost";
            _connection = _connectionFactory.CreateConnection();

            _modelCentralized = _connection.CreateModel();
            var queueResult = RabbitUtilityClass._modelCentralized.QueueDeclare("", false, true, true, null);
            _queueName = queueResult.QueueName;
            RabbitUtilityClass._modelCentralized.BasicQos(0, 1, false);

            QueueingBasicConsumer consumer = new QueueingBasicConsumer(RabbitUtilityClass._modelCentralized);
            string consumerTag = RabbitUtilityClass._modelCentralized.BasicConsume(_queueName, false, consumer);

            Task.Run(() =>
            {
                while (true)
                {
                    var e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    string content = Encoding.Default.GetString(e.Body);
                    string messageId = e.BasicProperties.MessageId;
                    string correlationId = e.BasicProperties.CorrelationId;

                    var reply = _cacheRequest[correlationId];
                    reply.ForEach(t => t.Post(jsonSerializer.Deserialize<AdditionServiceClass>(content)));
                    _cacheRequest.TryRemove(correlationId, out reply);

                    RabbitUtilityClass._modelCentralized.BasicAck(e.DeliveryTag, false);
                }
            });
        }
        public static Task<AdditionServiceClass> SendRequest(AdditionServiceClass objRequested)
        {
            var objJson = jsonSerializer.Serialize(objRequested);
            var result = new BufferBlock<AdditionServiceClass>();

            if (_cacheRequest.ContainsKey(objJson))
            {
                _cacheRequest[objJson].Add(result);
                return result.ReceiveAsync<AdditionServiceClass>((new CancellationTokenSource(30000)).Token);
            }

            _cacheRequest[objJson] = new List<BufferBlock<AdditionServiceClass>>();
            _cacheRequest[objJson].Add(result);

            IBasicProperties basicProperties = _modelCentralized.CreateBasicProperties();
            basicProperties.MessageId = Guid.NewGuid().ToString();
            basicProperties.CorrelationId = objJson;
            basicProperties.ReplyTo = _queueName;

            Task.Run(() =>
            {
                _modelCentralized.BasicPublish(ExchangeName, "", basicProperties, Encoding.Default.GetBytes(objJson));
            });

            return result.ReceiveAsync<AdditionServiceClass>((new CancellationTokenSource(30000)).Token);
        }
    }

Gran parte del codice è per la creazione del collegamento con il nostro message broker (RabbitMQ in tutti questi esempi).

SendRequest è la funzione che abbiamo chiamato dalla nostra pagina asp.net che controlla che la richiesta è già stata inviata, in questo caso sarà ritornato un oggetto Task con la nostra risposta con il metodo ReceiveAsync di BufferBlock. Questo sarà utilizzato per bloccare la richiesta e sarà salvato in una collection utilizzata per inviare la risposta quando RabbitMQ ce la invierà.

L'istanza di classe statica "RabbitUtilityClass" crea la queue privata per l'attesa della risposta e rimane in attesa di eventuali messaggi. Come nell'esempio con la tecnologia node.js sono utilizzati le property del messaggio per riconoscere la chiamata e per popolare gli oggetti BufferBlock corretti che sbloccheranno le pagine che ne hanno fatto richiesta. Il codice appare più complesso di quello con node.js, ma fa il suo lavoro (si potrebbe migliorare ancora, come con l'aggiunta di messaggi di errore con dettagli utilizzabili).

A questo link trovate il codice sorgente.

Questa volta sono stato più breve.

Commenti

Visualizza/aggiungi commenti

| Condividi su: Twitter, Facebook, LinkedIn

Per inserire un commento, devi avere un account.

Fai il login e torna a questa pagina, oppure registrati alla nostra community.

Nella stessa categoria
I più letti del mese