Мириады запущенных задач на C# +26


Недавно на ресурсе Medium были опубликованы две статьи от одного и того же автора, затрагивающие функциональность C# async/await.


Основными выводами были:


  • рекурсивный вызов асинхронного метода в C# подвержен StackOverflowException
  • goroutine'ы лучше задач (тасков) в .NET в плане производительности

Но главная проблема вышеприведенных публикаций — абсолютное непонимание модели кооперативной многозадачности в C# с вводом читателей в заблуждение. Сами же бенчмарки — бессмысленные, как мы увидим позже.


Далее в статье я попытаюсь раскрыть суть проблемы более подробно с примерами решения.


TL;DR

После небольшой правки кода исходных примеров, реализация бенчмарка на .NET оказывается быстрее варианта Go. Попутно решаем проблему переполнения стека у рекурсивных асинхронных методов.


NB: использоваться будут свежевыпущенный .NET Core 2.0 и Go 1.8.3.


Stack overflow & async


Перейдем сразу к рассмотрению примера #1:


using System;
using System.Threading;
using System.Threading.Tasks;

namespace CSharpAsyncRecursion
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Counted to {0}.", CountRecursivelyAsync(10000).Result);
        }

        static async Task<int> CountRecursivelyAsync(int count)
        {
            if (count <= 0)
                return count;
            return 1 + await CountRecursivelyAsync(count - 1);
        }
    }
}

Консоль упадет со StackOverflowException. Печаль!


Вариант реализации tail-call оптимизации здесь не подходит, т.к. мы не собираемся править компилятор, переписывать байт-код и т.п.


Поэтому решение должно подходить для максимально общего случая.


Обычно рекурсивный алгоритм заменяют на итерационный. Но в данном случае нам это не подходит также.


На помощь приходит механизм отложенного выполнения.
Реализуем простой метод Defer:


Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => 
    Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();

Для того, чтобы поставить задачу в очередь необходимо указание планировщика.
Методы Task.Run и Task.Factory.StartNew позволяют его использовать (По-умолчанию — TaskScheduler.Default, который для данного примера и так подойдет), а последний позволяет передать объект-состояние в делегат.


На даный момент Task.Factory.StartNew не подерживает обобщенные перегрузки и вряд ли будет. Если необходимо передать состояние, то либо Action<object>, либо Func<object, TResult>.

Перепишем пример, используя новый метод Defer:


static async Task Main(string[] args)
{
    Task<T> Defer<T, TState>(Func<TState, Task<T>> task, TState state) => 
        Task.Factory.StartNew(async s => await task((TState)s), state).Unwrap();

    Task<int> CountRecursivelyAsync(int count)
    {
        if (count <= 0)
            return Task.FromResult(count);

        return Defer(seed => CountRecursivelyAsync(seed - 1).ContinueWith(rec => rec.Result + 1), count);
    }

    Console.WriteLine($"Counted to {await CountRecursivelyAsync(100000)}.");
}

Оно не то, чем кажется


Для начала ознакомимся с кодом бенчмарков из этой статьи.


Код на Go:
package main

import (
    "flag";
    "fmt";
    "time"
)

func measure(start time.Time, name string) {
    elapsed := time.Since(start)
    fmt.Printf("%s took %s", name, elapsed)
    fmt.Println()
}

var maxCount = flag.Int("n", 1000000, "how many")

func f(output, input chan int) {
    output <- 1 + <-input
}

func test() {
    fmt.Printf("Started, sending %d messages.", *maxCount)
    fmt.Println()
    flag.Parse()
    defer measure(time.Now(), fmt.Sprintf("Sending %d messages", *maxCount))
    finalOutput := make(chan int)
    var left, right chan int = nil, finalOutput
    for i := 0; i < *maxCount; i++ {
        left, right = right, make(chan int)
        go f(left, right)
    }
    right <- 0
    x := <-finalOutput
    fmt.Println(x)
}

func main() {
    test()
    test()
}

C#-код:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;

namespace ChannelsTest
{
    class Program
    {
        public static void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1)
        {
            test(warmupCount, true); // Warmup
            var sw = new Stopwatch();
            GC.Collect();
            sw.Start();
            test(count, false);
            sw.Stop();
            Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
        }

        static async void AddOne(WritableChannel<int> output, ReadableChannel<int> input)
        {
            await output.WriteAsync(1 + await input.ReadAsync());
        }

        static async Task<int> AddOne(Task<int> input)
        {
            var result = 1 + await input;
            await Task.Yield();
            return result;
        }

        static void Main(string[] args)
        {
            if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
                maxCount = 1000000;
            Measure($"Sending {maxCount} messages (channels)", (count, isWarmup) => {
                var firstChannel = Channel.CreateUnbuffered<int>();
                var output = firstChannel;
                for (var i = 0; i < count; i++) {
                    var input = Channel.CreateUnbuffered<int>();
                    AddOne(output.Out, input.In);
                    output = input;
                }
                output.Out.WriteAsync(0);
                if (!isWarmup)
                    Console.WriteLine(firstChannel.In.ReadAsync().Result);
            }, maxCount);
            Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => {
                var tcs = new TaskCompletionSource<int>();
                var firstTask = AddOne(tcs.Task);
                var output = firstTask;
                for (var i = 0; i < count; i++) {
                    var input = AddOne(output);
                    output = input;
                }
                tcs.SetResult(-1);
                if (!isWarmup)
                    Console.WriteLine(output.Result);
            }, maxCount);
        }
    }
}

Что брасается сразу в глаза:


  1. Сам пример (что для Go, что для C#) весьма странен. Все сводится к эмуляции цепочки действий и их лавинообразном 'спуске'. Более того в Go создается chan int на каждую итерацию из 1 млн. Это вообще best-practice??
  2. автор использует Task.Yield(), оправдывая это тем, что иначе пример упадет с StackOverflowException. С таким же успехом мог бы и Task.Delay задействовать. Зачем мелочиться-то?! Но, как увидели ранее, все проистекает из-за 'неудачного' опыта с рекурсивными вызовами асинхронных методов.
  3. Изначально в примерах также фигурирует бета-версия System.Threading.Tasks.Channels для сравнения с каналами в Go. Я решил оставить только пример с тасками, т.к. библиотека System.Threading.Tasks.Channels еще не выпущена официально.
  4. Вызов GC.Collect() после прогрева. Боюсь, я откажусь от такого сомнительного преимущества.

Go использует понятие goroutine — легковесных потоков. Соответственно каждая горутина имеет свой стек. На данный момент размер стека равен 2KB. Поэтому при запуске бенчмарков будьте осторожны (более 4GB понадобиться)!

С одной стороны, это может быть полезно CLR JIT'у, а с другой — Go переиспользует уже созданные горутины, что позволяет исключить замеры трат на выделение памяти системой.

Результаты до оптимизации


Среда тестирования:


  • Core i7 6700HQ (3.5 GHz)
  • 8 GB DDR4 (2133 MHz)
  • Win 10 x64 (Creators Update)

Ну что ж, у меня получились следующие результаты:


Warmup (s) Benchmark (s)
Go 9.3531 1.0249
C# - 1.3568

NB: Т.к. пример реализует просто цепочку вызовов, то ни GOMAXPROCS, ни размер канала не влияют на результат (уже проверено опытным путем). В расчет берем наилучшее время. Флуктуации не совсем важны, т.к. разница большая.


Да, действительно: Go опережает C# на ~30%.
Challange accepted!

Используй TaskScheduler, Luke!


Если не использовать что-то наподобие Task.Yield, то снова будет StackOverflowException.


На этот раз не будем использовать Defer!


Мысль реализации проста: запускаем доп. поток, который слушает/обрабатывает задачи по очереди.


По-моему, легче реализовать собственный планировщик, чем контекст синхронизации.
Сам класс TaskScheduler выглядит так:


// Represents an object that handles the low-level work of queuing tasks onto threads.
public abstract class TaskScheduler
{
    /* остальные методы */

    public virtual int MaximumConcurrencyLevel { get; }

    public static TaskScheduler FromCurrentSynchronizationContext();

    protected abstract IEnumerable<Task> GetScheduledTasks();

    protected bool TryExecuteTask(Task task);

    protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);

    protected internal abstract void QueueTask(Task task);

    protected internal virtual bool TryDequeue(Task task);
}

Как мы видим, TaskScheduler уж реализует подобие очереди: QueueTask и TryDequeue.


Дабы не изобретать велосипед, воспользуемся уже готовыми планировщиками от команды .NET.


Внимание! Камера! Мотор!


Перепишем это дело на C# 7, делая его максимально приближенным к Go:


static async Task Main(string[] args)
{
    void Measure(string title, Action<int, bool> test, int count, int warmupCount = 1
    {
        var sw = new Stopwatch();
        sw.Start();
        test(count, false);
        sw.Stop();
        Console.WriteLine($"{title}: {sw.Elapsed.TotalMilliseconds:0.000}ms");
    }
    async Task<int> f(Task<int> input)
    {
        return 1 + await input; // return output
    }
    await Task.Factory.StartNew(() =>
    {
        if (!int.TryParse(args.FirstOrDefault(), out var maxCount))
            maxCount = 1000000;
        Measure($"Sending {maxCount} messages (Task<int>)", (count, isWarmup) => {
            var tcs = new TaskCompletionSource<int>();
            (var left, var right) = ((Task<int>)null, f(tcs.Task));
            for (var i = 0; i < count; i++)
            {
                left = f(right);
                right = left;
            }
            tcs.SetResult(-1);
            if (!isWarmup)
                Console.WriteLine(right.Result);
        }, maxCount);
    }, CancellationToken.None, TaskCreationOptions.None, new StaTaskScheduler(2));
}

Здесь необходимо сделать пару ремарок:


  • GC.Collect() убираем как и говорилось выше
  • Используем StaTaskScheduler с двумя вспомогательными потоками, чтобы избежать блокировки: один ждет результата из главной/последней задачи, а др. обрабатывает саму цепочку задач.

Проблема рекурсивных вызовов исчезает автоматически. Поэтому смело убираем из метода f(input) вызов Task.Yield(). Если этого не сделать, то можно ожидать чуть более лучший результат по сравнению с исходным, т.к. дефолтный планировщик использует ThreadPool.


Теперь публикуем релизную сборку:
dotnet publish -c release -r win10-x64


И запускаем...



Внезапно получаем около 600 ms вместо прежних 1300 ms. Not bad!
Go, напомню, отрабатывал на уровне 1000 ms. Но меня не покидает чувство неуместности использования каналов как средство кооперативной многозадачности в исходных примерах.


p.s.
Я не делал огромного количества прогонов тестов с полноценной статистикой распределения значений замеров специально.
Цель статьи заключалась в освещении определенного use-case'a async/await и попутного развенчания мифа о невозможности рекурсивного вызова асинхронных методов в C#.


p.p.s.
Причиной изначального отставания C# было использование Task.Yield(). Постоянное переключение контекста — не есть гуд!




К сожалению, не доступен сервер mySQL