Asincronia

di Andrea Zani, in .NET4,

Preferirei evitare la riesumazione di vecchio codice. Provo quasi disgusto quando, aprendo un progetto con la versione installata di Visual Studio, viene richiesta la conversione del progetto per questa versione. next, next con la speranza che alla fine della procedura non vengano visualizzati strani errori. Il progetto in questione era un tool che connettendosi ad un server scaricava in un formato accessibile uno o più stream di dati proprietari. Scendendo nel dettaglio del puro codice, non era altro che una console application che, leggendo un file di configurazione, si connetteva ad un server remoto avviando uno o più thread, uno per ogni stream di dati, per il download e il salvataggio di dati. Funzionava. Mettere le mani ad una applicazione che funziona solo per farla funzionare meglio potrebbe portare due possibili conclusioni: ottenere l'effetto voluto con prestazioni migliori e/o consumo di risorse inferiore, oppure fare crollare tutta la delicata impalcatura che permetteva il funzionamento corretto dell'applicativo e dover poi tornare alla versione precedente - solo in questo caso è meglio restare in silenzio che passare per idioti.Il mio intento era ovviamente il primo. Il tutto funzionava in modo semplice, come detto: per ogni connessione richiesta aprivo un nuovo thread. E' questa la strada migliore giusta a questa evoluzione del framework .net? Assolutamente no visto che la creazione di thread ha un certo peso sia per la cpu che per il consumo di risorse. Partendo dall'inizio per rendere il tutto più comprensibile qui di seguito un esempio che, all'opposto di quello che mi ha ispirato per l'approfondimento, non cercherà di eseguire le connessioni su un server, ma sarà lui a poter ricevere un numero pressoché infinito di richieste in entrata. Ecco la prima versione del codice:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace Server2
{
    class Program
    {
        static void Main(string[] args)
        {
            List<Thread> coll = new List<Thread>();
            TcpListener listnet = new TcpListener(IPAddress.Any, 4001);
            listnet.Start();
            Console.WriteLine("Wait connection...");
            while (true)
            {
                TcpClient client = listnet.AcceptTcpClient();
                Thread th = new Thread(Accept);
                coll.Add(th);
                th.Start(client);
            }
        }
        static void Accept(object clientObject)
        {
            using (TcpClient client = (TcpClient)clientObject)
            using (NetworkStream n = client.GetStream())
            {
                while (true)
                {
                    string requestText = string.Empty;
                    while (true)
                    {
                        byte[] data = new byte[1000];
                        int byteRead = n.Read(data, 0, data.Length);
                        requestText += System.Text.Encoding.Default.GetString(data, 0, byteRead);
                        if (requestText.EndsWith("\r\n")) break;
                    }
                    if (requestText.Trim().ToLower() == "quit") break;
                    byte[] returnText = System.Text.Encoding.Default.GetBytes("ciao " + requestText);
                    n.Write(returnText, 0, returnText.Length);
                    n.Flush();
                }
            }
        }
    }
}
Banale come codice, non fa altro che attendere una connessione tcp alla porta 4001 e ogni stringa inviata viene ritornata al client fino a quando questo invia la stringa quit. Avviato è possibile connettersi anche via telnet:telnet In caso le connessioni in entrata fossero molte - si parla di qualche migliaio - il programma continuerebbe a funzionare senza problemi ma ci sarebbe un aumento di consumo di risorse (memoria) evidente.Vediamo la prima alternativa all'uso dei thread. La soluzione più semplice è lasciare tutto al ThreadPool che sarà in grado di caricare e scaricare i thread necessari. Ecco il codice con solo le modifiche al codice precedente:
        static void Main(string[] args)
        {
            ThreadPool.SetMaxThreads(50, 50); // <- aumentare il numero in caso
            ThreadPool.SetMinThreads(50, 50); // <- minimo di thread creati
            TcpListener listnet = new TcpListener(IPAddress.Any, 4000);
            listnet.Start();
            while (true)
            {
                TcpClient client = listnet.AcceptTcpClient();
                ThreadPool.QueueUserWorkItem(Accept, client);
            }
        }
Interessante la possibilità di definire il numero minimo e massimo di thread che il threadpool dovrà gestire. Nell'esempio si è definito come numero massimo il valore 50:
            ThreadPool.SetMaxThreads(50, 50);
Cosa succede alla cinquantunesi connessione? Rimarrà in coda fino a quando uno dei thread già allocato sarà liberato - detto in soldoni, quando un utente si disconnette dal nostra applicazione.Miglioriamo ancora il tutto: metodi asincroni. Questo ci permette di scaricare thread e di usarne il minor numero possibile in ogni caso con un consumo esiguo di cpu e memoria. L'esempio successivo è preso con pochissime modifiche da un noto libro della O'Reilly.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace Server3
{
    class Program
    {
        static void Main(string[] args)
        {
            TcpListener listnet = new TcpListener(IPAddress.Any, 4002);
            listnet.Start();
            Console.WriteLine("Wait connection3...");
            while (true)
            {
                TcpClient client = listnet.AcceptTcpClient();
                new MyEcho().Begin(client, null, null);
            }
        }
    }

    public class MyEcho : IAsyncResult
    {
        TcpClient _client;
        NetworkStream _stream;
        object _userState;
        ManualResetEvent _waitHandle = new ManualResetEvent(false);
        int _bytesRead = 0;
        byte[] _data = new byte[10000];
        Exception _exception;

        internal MyEcho() { }

        public object AsyncState { get { return _userState; } }
        public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
        public bool CompletedSynchronously { get { return false; } }
        public bool IsCompleted { get { return _waitHandle.WaitOne(0, false); } }

        internal void Begin(TcpClient c, AsyncCallback callback, object state)
        {
            _client = c;
            _userState = state;
            _stream = _client.GetStream();

            Task.Factory.StartNew(Read).ContinueWith(ant =>
            {
                _exception = ant.Exception;
                if (_stream != null)
                {
                    try
                    {
                        _stream.Close();
                    }
                    catch (Exception ex)
                    {
                        _exception = ex;
                    }
                    _waitHandle.Set();

                    if (callback != null) callback(this);
                }
            }, TaskContinuationOptions.OnlyOnFaulted);
        }
        internal byte[] End()
        {
            AsyncWaitHandle.WaitOne();
            if (_exception!=null) throw _exception;
            return _data;
        }

        void Read()
        {
            Task<int> readChunck = Task<int>.Factory.FromAsync(
                _stream.BeginRead, _stream.EndRead,
                _data, _bytesRead,_data.Length-_bytesRead, null);
            readChunck.ContinueWith(ContinueRead,
                TaskContinuationOptions.NotOnFaulted
                | TaskContinuationOptions.AttachedToParent);
        }
        void ContinueRead(Task<int> readChunk)
        {
            _bytesRead += readChunk.Result;
            string requestText = System.Text.Encoding.Default.GetString(_data, 0, _bytesRead);
            if (!requestText.EndsWith("\r\n"))
            {
                Read();
                return;
            }
            byte[] returnText = System.Text.Encoding.Default.GetBytes("ciao3 " + requestText);
            Task.Factory.FromAsync(_stream.BeginWrite, _stream.EndWrite,
                returnText, 0, returnText.Length, null);
            _bytesRead = 0;
            if (requestText.Trim().ToLower() != "quit")
            {
                Read();
            }
            else
            {
                _stream.Dispose();
            }
        }
    }
}
Le cose si fanno più complesse dei primi due esempi mostrati. Creata una classe apposita che esponga l'interfaccia IAsync, classe MyEcho, espone dei metodi che possono essere richiamati in modo asincrono. Le due righe di codice in main:
TcpClient client = listnet.AcceptTcpClient();
new MyEcho().Begin(client, null, null);
La prima attende una chiamata tcp da un client, la seconda lancia il metodo Begin dalla nostra classe asincrona continuando la sua esecuzione nel ciclo main. Nel dettaglio, Begin avvia con ask- nuovo oggetto presente nel framework dalla versione 4 per una gestione più facile dei thread - una funziona asincrona la quale contiene al suo interno un'altra chiamata all'oggetto askma con il metodo FromAsyncAnche se non è mio codice proviamo a dare una spiegazione abbastanza comprensibile. Innanzitutto:Task.Factory.StartNew(Read)Avvia in modo asincrono, e continuando l'esecuzione alle righe successive, del metodo Read. Questo metodo è interessante perché mostra un approccio molto utile di FromAsync. Ma prima di questo ancora un attimo ci si sofferma sulla sintassi di Task molto utile:
Task.Factory.StartNew(Read).ContinueWith(ant =>
{
...
ContinueWith permette di inserire una funzione che sarà eseguita quando Read sarà terminato. Il bello è che è possibile creare anche un array di Task finiti i quali richiamare una funzione (utile per riorganizzare l'elaborazione in parallelo precedente). Ecco un esempio che non riguarda il codice visto finora presa dall'msdn:
Task<int>[] tasks = new Task<int>[2];
            tasks[0] = new Task<int>(() =>
            {
                // Do some work...  
                return 34;
            });

            tasks[1] = new Task<int>(() =>
            {
                // Do some work... 
                 return 8;
            });

            var continuation = Task.Factory.ContinueWhenAll(
                            tasks,
                            (antecedents) =>
                            {
                                int answer = tasks[0].Result + tasks[1].Result;
                                Console.WriteLine("The answer is {0}", answer);
                            });

            tasks[0].Start();
            tasks[1].Start();
            continuation.Wait();
Finiti i due task i quali ritornano un numero intero, con ContinueWhenAll li possiamo sommare. Inoltre si può specificare se la funzione debba avvenire in determinate circostante. Per esempio, se il Task principale può ritornare una Exception, possiamo decidere qualche funziona richiamare in caso questo termini correttamente o no:
Task.Factory.StartNew(myFunction).ContinueWith(ant => {...ok...},TaskContinuationOptions.OnlyOnFaulted);
Task.Factory.StartNew(myFunction).ContinueWith(ant => {...Error...},TaskContinuationOptions.OnlyOnFaulted);
Ritornado all'esempio del server Tcp, ecco il codice interessante:
Task<int> readChunck = Task<int>.Factory.FromAsync(
                _stream.BeginRead, _stream.EndRead,
                _data, _bytesRead,_data.Length-_bytesRead, null);
E' interessante questo punto, perché il Framework ci viene in aiuto per richiamare vecchi metodi rispettosi del pattern APM: la classe Stream è uno di questi, visto che possiede i due metodi Begin e End. FromAsync permette appunto l'utilizzo di questi vecchi oggetti e accetta in sequenza, il metodo da richiamare Begin, End, quindi i parametri accettati dal metodo Begin. Per esempio:
public virtual IAsyncResult BeginWrite(
    byte[] buffer,
    int offset,
    int count,
    AsyncCallback callback,
    Object state
)
Vengono ripresi e inviati al metodo FromAsync con l'unica eccezione del callBack che viene passato come secondo parametro.Ritornando al codice, i bytes inviati via tcp saranno concatenati in _data fino a quando è riscontrato il codice fine riga (questo controllo viene fatto nella funzione ContinueRead. Il tutto si chiude scrivendo, come visto sopra, quit con il tasto invio.Quest'ultimo esempio si potrebbe considerare come tecnica ideale per quanto da me richiesto, ma c'è una strada ancora migliore sempreché possiamo utilizzare il Framework 4.5. Da questa versione è stata introdotto in modo pressoché completo l'uso dei metodi asincroni facilitando la loro creazione e il loro uso in modo talmente banale da renderlo quasi pericoloso. Il pattern utilizzato è il TAP (Task Asynchronous Pattern), e all'interno del Framework 4.5 sono state ampliate una marea di classi per l'uso di questo pattern, riconoscibili dalla presenza di nomi di funzione terminanti con Async. L'esempio sincrono seguente:
void First()
{
int result=Calc(12);
Console.WriteLine(result);
}
int Calc(int i)
{
return i*i;
}
Viene trasformata in asincrono scrivendola in questo modo (anche se di nulla praticità):
async Task First()
{
int result= await Calc(12);
Console.WriteLine(result);
}
async Task<int> Calc(int i)
{
return i*i;
}
La prima modifica visibile è che i metodi void vengono trasformati in modo che possano tornare un oggetto Task (anche se nel corpo del metodo non ritorniamo alcunché, sarà async a fare lo sporco lavoro dietro le quinte per noi) partendo dalla regola principale che non si possono cambiare metodi asincroni da metodi senza la dichiarazione async, marcati sia il metodo First che Calc con questa parola chiave.L'utilità potrebbe mostrarsi nell'esempio qui sopra se la funzione del calcolo fosse molto più lenta.
        async Task First()
        {
            int result = 0;
            Task<int> result2 = Calc(12);
            Console.Write("Before");
            result=await result2;
            Console.WriteLine(":: "+result);
        }

        async Task<int> Calc(int i)
        {
            await Task.Delay(1000);
            return i * i;
        }
Ora quando il codice richiama la funzione Calc lo farà con un altro thread, mentre il codice della funzione First continuerà la sua esecuzione mostrando la scritta "Before" e solo alla riga con l'attesa del risultato del calcolo, rimarrà in attesa della funzione eseguita nel secondo thread. Nulla vieterebbe di chiamare più volte la funzione in modo che vengano eseguiti in parallelo più calcoli:
            Task<int> result2 = Calc(12);
            Task<int> result3 = Calc(13);
            Console.Write("Before");
            result=await result2;
            resultx=await result3;
// oppure:
int[] results=await Task.WhenAll(result2, result3);
Come scritto prima anche molte classi presenti fin dalla prima versione del Framework sono state ampliate. Per esempio la classe Stream (presente in System.IO), dalla versione 4.5 espone anche metodi che rispettano il pattern TAP. Dall'msdn:
using (StreamReader reader = File.OpenText(filename))
            {
                result = new char[reader.BaseStream.Length];
                await reader.ReadAsync(result, 0, (int)reader.BaseStream.Length);
            }
Ora possiamo riscrivere il codice:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Server4
{
    class Program
    {
        static void Main(string[] args)
        {
            Task s = Start();
            s.Wait();
        }
        static async Task Start()
        {
            TcpListener listnet = new TcpListener(IPAddress.Any, 4003);
            listnet.Start();
            Console.WriteLine("Wait connection4...");
            while (true)
            {
                TcpClient client = await listnet.AcceptTcpClientAsync();
                HandleConnectionAsync(client);
            }
        }
        private static async void HandleConnectionAsync(TcpClient client)
        {
            try
            {
                using (NetworkStream n = client.GetStream())
                {
                    while (true)
                    {
                        string requestText = string.Empty;
                        while (true)
                        {
                            byte[] data = new byte[1000];
                            int byteRead = await n.ReadAsync(data, 0, data.Length);
                            requestText += System.Text.Encoding.Default.GetString(data, 0, byteRead);
                            if (requestText.EndsWith("\r\n")) break;
                        }
                        if (requestText.Trim().ToLower() == "quit") break;
                        byte[] returnText = System.Text.Encoding.Default.GetBytes("ciaoAsync " + requestText);
                        await n.WriteAsync(returnText, 0, returnText.Length);
                    }
                }
            }
            catch (Exception)
            {
            }
            finally
            {
                client.Close();
            }
        }
    }
}
La prima cosa che potrebbe saltare all'occhio è la netta semplificazione del codice che non solo è più breve, ma anche molto più leggibile se confrontato con l'esempio precedente. Utilizzando metodi appositi asincroni ora possiamo elaborare le richieste tcp in entrata:
TcpClient client = await listnet.AcceptTcpClientAsync();
E leggere e scrivere il flusso di byte in esso:
int byteRead = await n.ReadAsync(data, 0, data.Length);
await n.WriteAsync(returnText, 0, returnText.Length);
Mi piace.

Commenti

Visualizza/aggiungi commenti

| Condividi su: Twitter, Facebook, LinkedIn

Per inserire un commento, devi avere un account.

Fai il login e torna a questa pagina, oppure registrati alla nostra community.

Nella stessa categoria
I più letti del mese