Benefícios da utilização do BufferBlock em redes de fluxo de dados

gostaria de saber se há benefícios associados com o uso de um BufferBlock vinculado a um ou muitos ActionBlocks, outros de otimização (usando BoundedCapacity), em vez de apenas postar diretamente para ActionBlock(s) (contanto que a limitação não é necessário).

Author: Dimitri, 2012-10-08

3 answers

Se tudo o que você quer fazer é encaminhar itens de um bloco para vários outros, você não precisa {[[0]}.

Mas há certamente casos em que é útil. Por exemplo, se você tem uma rede de dados complexa, você pode querer construí-la a partir de sub-redes menores, cada uma criada em seu próprio método. E para fazer isso, você precisa de alguma maneira de representar um grupo de blocos. No caso que você mencionou, devolver esse único {[[0]} (provavelmente como ITargetBlock) do método seria fácil solucao.

Outro exemplo em que BufferBlock seria útil é se você quisesse enviar itens de vários blocos de origem para vários blocos de destino. Se usou BufferBlock como intermediário, não tem de ligar cada bloco de origem a cada bloco de destino.

Tenho a certeza que existem muitos outros exemplos onde você poderia usar {[[0]}. Claro que, se não vires nenhuma razão para o usares no teu caso, então não o faças.

 16
Author: svick, 2012-10-08 12:30:03
Para acrescentar à resposta do svick, há outro benefício dos bloqueios. Se você tiver um bloco com várias ligações de saída e quiser equilibrar entre elas, você tem que virar os blocos de saída para não-ganancioso e adicionar um bufferblock para lidar com a fila. Achei o seguinte exemplo útil:

Citado de um link que está agora morto:

Isto é o que estamos a planear fazer.
  • algum bloco de código irá postar dados no BufferBlock usando o seu Post(T t) metodo.
  • Este 'BufferBlock' Está ligado a 3 instâncias do 'ActionBlock' usando o método 'LinkTo t' do 'BufferBlock'.

Note, que o BufferBlock não entrega cópias dos dados de entrada a todos os blocos de destino a que está ligado.Em vez disso, fá-lo apenas para um bloco alvo.Aqui estamos esperando que quando um alvo está ocupado processando o request.It será entregue ao outro alvo.Agora vamos nos referir ao código abaixo:

static void Main(string[] args)
    {
        BufferBlock<int> bb = new BufferBlock<int>();
        ActionBlock<int> a1 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(100);
                                                        Console.WriteLine("Action A1 executing with value {0}", a);
                                                    }
                                                  );

        ActionBlock<int> a2 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(50);
                                                        Console.WriteLine("Action A2 executing with value {0}", a);
                                                    }
                                                  );
        ActionBlock<int> a3 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(50);
                                                        Console.WriteLine("Action A3 executing with value {0}", a);
                                                    }
                                                  );
        bb.LinkTo(a1);
        bb.LinkTo(a2);
        bb.LinkTo(a3);
        Task t = new Task(() =>
                            {
                                int i = 0;
                                while (i < 10)
                                {
                                    Thread.Sleep(50);
                                    i++;
                                    bb.Post(i);
                                }
                            }
                         );
        t.Start();
        Console.Read();
    }
Quando executado, produz o seguinte: resultado:
  • acção A1 a executar com o valor 1
  • acção A1 a executar com o valor 2
  • acção A1 a executar com o valor 3
  • acção A1 a executar com o valor 4
  • acção A1 a executar com o valor 5
  • acção A1 a executar com o valor 6
  • acção A1 a executar com o valor 7
  • acção A1 a executar com o valor 8
  • acção A1 a executar com o valor 9
  • acção A1 a executar com o valor 10
Isto mostra que apenas um alvo está realmente executando todos os dados, mesmo quando está ocupado(devido ao tópico.Sono (100) adicionado propositadamente).Por quê?

Esta é, porque todos os blocos de destino são, por padrão, ganancioso por natureza e buffers de entrada, mesmo quando eles não são capazes de processar os dados.Para alterar este comportamento, temos de definir o Ganancioso propriedade para false no DataFlowBlockOptions ao inicializar o ActionBlock como mostrado abaixo.

static void Main(string[] args)
    {
        BufferBlock<int> bb = new BufferBlock<int>();
        ActionBlock<int> a1 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(100);
                                                        Console.WriteLine("Action A1 executing with value {0}", a);
                                                    }
                                                  , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                             maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
                                                                             cancellationToken: CancellationToken.None,
                                                                             //Not Greedy
                                                                             greedy: false)
                                                  );

        ActionBlock<int> a2 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(50);
                                                        Console.WriteLine("Action A2 executing with value {0}", a);
                                                    }
                                                    , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                             maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
                                                                             cancellationToken: CancellationToken.None,
                                                                             greedy: false)
                                                  );
        ActionBlock<int> a3 = new ActionBlock<int>((a) =>
                                                    {
                                                        Thread.Sleep(50);
                                                        Console.WriteLine("Action A3 executing with value {0}", a);
                                                    }
                                                    , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                             maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
                                                                             cancellationToken: CancellationToken.None,
                                                                             greedy: false)
                                                  );
        bb.LinkTo(a1);
        bb.LinkTo(a2);
        bb.LinkTo(a3);
        Task t = new Task(() =>
                            {
                                int i = 0;
                                while (i < 10)
                                {
                                    Thread.Sleep(50);
                                    i++;
                                    bb.Post(i);
                                }
                            }
                         );
        t.Start();
        Console.Read();
    }

A saída deste programa é:

  • Acção A1 executar com o valor 1
  • acção A2 a executar com o valor 3
  • acção A1 a executar com o valor 2
  • Acção A3 a executar com valor 6 Acção A3 a executar com valor 7 Acção A3 a executar com valor 8
  • acção A2 a executar com o valor 5
  • Acção A3 a executar com valor 9
  • acção A1 a executar com o valor 4
  • acção A2 a executar com o valor 10

Esta é claramente uma distribuição dos dados através três blocos de ação como esperado.

 19
Author: VoteCoffee, 2014-10-31 11:54:56

Não, o segundo exemplo não compila por uma série de razões: só é possível definir greedy=false para um bloco de dados "agrupando" - não para um bloco de execução; e então ele tem que ser definido através de GroupingDataflowBlockOptions - não DataflowBlockOptions; e então ele é definido como um valor de propriedade "{ Greedy = false }" não como um parâmetro de construtor.

Se quiser acelerar a capacidade de um bloco de Acção, faça-o fixando o valor da propriedade de capacidade limitada de DataflowBlockOptions (embora, como o OP afirmou, Eles já estão cientes desta opção). Assim:
var a1 = new ActionBlock<int>(
            i => doSomeWork(i), 
            new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
        );
 4
Author: Steve Blomeley, 2017-02-25 19:51:12