我们都知道C#有现成的ThreadPool,但是为什么不手撕线程,成为县城撕裂者呢?

关于线程池

From Wikipedia: https://zh.wikipedia.org/wiki/线程池

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。

任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。

线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。

  • 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。
  • 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。

线程池的伸缩性对性能有较大的影响。

  • 创建太多线程,将会浪费一定的资源,有些线程未被充分使用。
  • 销毁太多线程,将导致之后浪费时间再次创建它们。
  • 创建线程太慢,将会导致长时间的等待,性能变差。
  • 销毁线程太慢,导致其它线程资源饥饿。

我们这里实现的是半异步模式

C#中lock的用法

官方文档:https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/keywords/lock-statement

lock 语句获取给定对象的互斥 lock,执行语句块,然后释放 lock。 持有 lock 时,持有 lock 的线程可以再次获取并释放 lock。 阻止任何其他线程获取 lock 并等待释放 lock。

通俗的说 lock(x) 语句可以保证x元素同时只被当前线程访问,如果其他线程要用x,那么将会等待,直到结束lock块。

注意:x必须要是引用类型 (Reference Type) 的对象, https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/keywords/reference-types

C#中int / long / uint / ulong的线程安全操作

官方文档:https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=net-5.0

里面详细列举了如何操作。

C#中线程安全的集合类

官方文档:https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/

类型 描述
BlockingCollection<T> 为实现 IProducerConsumerCollection<T> 的所有类型提供限制和阻止功能。有关详细信息,请参阅 BlockingCollection 概述。
ConcurrentDictionary<TKey,TValue> 键值对字典的线程安全实现。
ConcurrentQueue<T> FIFO(先进先出)队列的线程安全实现。
ConcurrentStack<T> LIFO(后进先出)堆栈的线程安全实现。
ConcurrentBag<T> 无序元素集合的线程安全实现。
IProducerConsumerCollection<T> 类型必须实现以在 BlockingCollection 中使用的接口。

Thread or Task?

注意:在C#当中,两者差别不大。

一般而言,Thread是指线程,而Task是指任务,任务需要线程来实现。

stackoverflow上的县城撕裂者

https://stackoverflow.com/questions/5826981/how-to-reuse-threads-in-net-3-5

注:

public class SuperQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;

    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="workerCount">The worker count.</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public SuperQueue(int workerCount, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;
        _workers = new List<Thread>(workerCount);

        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
            _workers.Add(t);
            t.Start();
        }
    }

    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
            Monitor.PulseAll(_locker);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item;
            lock (_locker)
            {
                while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        _workers.ForEach(thread => EnqueueTask(null));

        _workers.ForEach(thread => thread.Join());

    }
}

稍加改进

由于我还有想要知晓什么时候任务结束的需求,所以我设计了一个计算idleWorker数量的方法:

using System;
using System.Collections.Generic;
using System.Threading;

namespace Ayase.ThreadLib
{
    /// <summary>
    /// Custom Thread Pool Implementation.
    /// Modified from https://stackoverflow.com/questions/5826981/how-to-reuse-threads-in-net-3-5,
    /// by dashton@stackoverflow, ChrisWue@stackoverflow
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class IThreadPool<T> : IDisposable where T : class
    {
        private readonly object _locker = new object();
        private readonly object _idleLocker = new object();
        private readonly List<Thread> _workers;
        private readonly Queue<T> _taskQueue = new Queue<T>();
        private readonly Action<T> _dequeueAction;
        public readonly long WorkerCount;

        private int _idleWorkerCount = 0;


        /// <summary>
        /// The count of idle worker
        /// </summary>
        public int IdleWorkerCount
        {
            get { return _idleWorkerCount; }
        }

        public bool Idle;

        /// <summary>
        /// Initializes a new instance of the <see cref="IThreadPool{T}"/> class.
        /// </summary>
        /// <param name="workerCount">The worker count.</param>
        /// <param name="dequeueAction">The dequeue action.</param>
        public IThreadPool(int workerCount, Action<T> dequeueAction)
        {
            Idle = true;
            WorkerCount = workerCount;
            _idleWorkerCount = workerCount;
            _dequeueAction = dequeueAction;
            _workers = new List<Thread>(workerCount);

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i ++)
            {
                Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("IThreadPool worker {0}", i) };
                _workers.Add(t);
                t.Start();
            }
        }


        /// <summary>
        /// Enqueues the task.
        /// </summary>
        /// <param name="task">The task.</param>
        public void EnqueueTask(T task)
        {
            lock (_locker)
            {
                _taskQueue.Enqueue(task);
                Idle = false;
                Monitor.PulseAll(_locker);
            }
        }


        /// <summary>
        /// Consumes this instance.
        /// </summary>
        private void Consume()
        {
            while (true)
            {
                T item;
                lock (_locker)
                {
                    while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                    item = _taskQueue.Dequeue();
                }
                if (item == null) return;

                Interlocked.Decrement(ref _idleWorkerCount);
                // run actual method
                _dequeueAction(item);
                Interlocked.Increment(ref _idleWorkerCount);
                if (_idleWorkerCount == WorkerCount && _taskQueue.Count == 0)
                {
                    lock (_idleLocker)
                    {
                        Idle = true;
                        Monitor.PulseAll(_idleLocker);
                    }
                } else Idle = false;
            }
        }


        /// <summary>
        /// Waiting for everything to finish.
        /// </summary>
        public void Join()
        {
            lock (_idleLocker)
            {
                while (!Idle) Monitor.Wait(_idleLocker);
            }
            Dispose();
        }


        /// <summary>
        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            _workers.ForEach(thread => EnqueueTask(null));
            _workers.ForEach(thread => thread.Join());
        }
    }
}
最后修改:2021 年 08 月 01 日 12 : 18 PM
真的不买杯奶茶嘛....qwq