基于folly的AtomicIntrusiveLinkedList无锁队列…

2018-06-17 21:05:12来源:未知 阅读 ()

新老客户大回馈,云服务器低至5折

1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码:

#ifndef FOLLY_REVISE_H
#define FOLLY_REVISE_H

namespace folly {
    /**
    * A very simple atomic single-linked list primitive
    *
    */

    template <typename T>
    struct node
    {
        T data;
        node* next;

        node(const T& data) : data(data), next(nullptr) { }
        node(T&& data): data(std::move(data)), next(nullptr) { }
    };


    template <class T>
    class AtomicForwardList
    {
    public:
        AtomicForwardList() { }
        AtomicForwardList(const AtomicForwardList&) = delete;
        AtomicForwardList& operator=(const AtomicForwardList&) = delete;

        AtomicForwardList(AtomicForwardList&& other) noexcept
            : head_(other.head_.load())
        {
            other.head_ = nullptr;
        }

        AtomicForwardList& operator=(AtomicForwardList&& other) noexcept
        {
            AtomicForwardList tmp(std::move(other));
            swap(*this, tmp);

            return *this;
        }

        /**
        * Note: list must be empty on destruction.
        */
        ~AtomicForwardList()
        {
            assert(empty());
        }

        bool empty() const
        {
            return head_.load() == nullptr;
        }

        /**
        * Atomically insert t at the head of the list.
        * @return True if the inserted element is the only one in the list
        *        after the call.
        */
        bool insertHead(T* t)
        {
            assert(t->next == nullptr);

            auto oldHead = head_.load(std::memory_order_relaxed);
            do
            {
                t->next = oldHead;
                /* oldHead is updated by the call below.
                NOTE: we don't use next(t) instead of oldHead directly due to
                compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899),
                MSVC (bug 819819); source:
                http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */
            } while (!head_.compare_exchange_weak(oldHead, t,
                std::memory_order_release, std::memory_order_relaxed));

            return oldHead == nullptr;
        }

        /**
        * Replaces the head with nullptr,
        * and calls func() on the removed elements in the order from tail to head
        * Returns false if the list was empty.
        */
        template <typename F>
        bool sweepOnce(F&& func)
        {
            if (auto head = head_.exchange(nullptr))        // why is memory_order_seq_cst
            {
                auto rhead = reverse(head);
                unlinkAll(rhead, std::forward<F>(func));
                return true;
            }
            return false;
        }

        /**
        * Repeatedly replaces the head with nullptr
        * and calls func() on the removed elements in the order from tail to head.
        * Stops when the list is empty.
        */
        template <typename F>
        void sweep(F&& func)
        {
            while (sweepOnce(std::forward<F>(func)))
            {
            }
        }

        /**
        * Similar to sweepOnce() but calls func() on elements in LIFO order
        *
        * func() is called for all elements in the list at the moment
        * reverseSweepOnce() is called. 
        */
        template <typename F>
        bool reverseSweepOnce(F&& func)
        {
            // We don't loop like sweep() does because the overall order of callbacks
            // would be strand-wise LIFO which is meanless to callers.
            if (auto head = head_.exchange(nullptr))
            {
                unlinkAll(head, std::forward<F>(func));
                return true;
            }
            return false;
        }

        /**
        * Replaces the head with nullptr,
        * and get the member list pointed by head in input order
        */
        T* getInputList()
        {
            if (auto head = head_.exchange(nullptr, std::memory_order_acquire))        // why is memory_order_seq_cst
            {
                auto rhead = reverse(head);
                return rhead;
            }

            return nullptr;
        }

        /**
        * Replaces the head with nullptr
        * and get the member list pointed by head in reversed input order
        */
        T* getList()
        {
            return head_.exchange(nullptr);
        }

    private:
        std::atomic<T*> head_{ nullptr };

        /* Reverses a linked list, returning the pointer to the new head
        (old tail) */
        static T* reverse(T* head)
        {
            T* rhead = nullptr;

            while (head != nullptr)
            {
                auto t = head;
                head = t->next;
                t->next = rhead;
                rhead = t;
            }

            return rhead;
        }

        /* Unlinks all elements in the linked list fragment pointed to by 'head',
        * calling func() on every element */
        template <typename F>
        void unlinkAll(T* head, F&& func)
        {
            while (head != nullptr)
            {
                auto t = head;
                head = t->next;
                t->next = nullptr;
                func(t);
            }
        }
    };
}


#endif    // FOLLY_REVISE_H

2.基于上面无锁队列的封装

#ifndef COMPOSITE_ATOMIC_LIST_H
#define COMPOSITE_ATOMIC_LIST_H

/**
* Compose a multiple-producers and multiple-consumers atomic list
* through given consumer number AtomicForwardList
*/

#include <vector>
#include <cassert>
#include "folly_revise.h"

namespace folly
{

    template <class T>
    class CompositeAtomicList
    {
    public:
        using size_type = typename std::vector<AtomicForwardList<T>>::size_type;
    public:
        CompositeAtomicList(size_type producerNum, size_type consumerNum)
            : m_producerNum(producerNum), m_consumerNum(consumerNum)
        {
            // it is meanless if there is no producer or consumer
            assert(producerNum > 0);
            assert(consumerNum > 0);

            // the number of composite list is equal to consumer number
            m_compositeList.resize(consumerNum);
            // initialize the first insertion index of the producers
            m_producerIdxs.resize(producerNum);
            for (std::vector<size_type>::size_type si = 0; si != m_producerIdxs.size(); ++si)
            {
                m_producerIdxs[si] = si % consumerNum;
            }
        }

        CompositeAtomicList(const CompositeAtomicList&) = delete;
        CompositeAtomicList& operator=(const CompositeAtomicList&) = delete;

        //CompositeAtomicList(CompositeAtomicList&& other) noexcept
        //    : m_producerNum(other.m_producerNum), m_consumerNum(other.m_consumerNum),
        //    m_producerIdxs(std::move(other.m_producerIdxs),
        //    m_compositeList(std::move(other.m_compositeList)
        //{

        //}
        CompositeAtomicList(CompositeAtomicList&& other) noexcept = default;
        CompositeAtomicList& operator=(CompositeAtomicList&& other) noexcept = default;

        ~CompositeAtomicList() = default;

        // producer num
        size_type getProducerNum() const
        {
            return m_producerNum;
        }

        // consumer num
        size_type getConsumerNum() const
        {
            return m_consumerNum;
        }

        bool empty() const
        {
            // if there is one consumer list is not empty, the CompositeList is not empty
            for (const auto& item : m_compositeList)
            {
                if (!item.empty())
                {
                    return false;
                }
            }

            return true;
        }

        // insert node for producer number producer_num
        bool insertHead(size_type producer_num, T* t)
        {
            auto ret = m_compositeList[m_producerIdxs[producer_num]].insertHead(t);
            m_producerIdxs[producer_num] = (++m_producerIdxs[producer_num]) % m_consumerNum;
            return ret;
        }

        /**
        * A consumer function
        * consume nodes for consumer number consumer_num through
        * invoking function func for every list node in consumer_num.
        * You should invoke all consumer function for a particular consumer_num
        * within just one thread to evenly distribute the tasks.
        *
        * Recommend calling std::this_thread::yield() when this function returns false
        */
        template <typename F>
        bool sweepOnce(size_type consumer_num, F&& func)
        {
            return m_compositeList[consumer_num].sweepOnce(std::forward<F>(func));
        }

        /**
        * A consumer function
        * repeat consume nodes for consumer number consumer_num through
        * invoking function func for every list node in consumer_sum.
        * You should invoke all consumer function for a particular consumer_num
        * within just one thread to evenly distribute the tasks.
        *
        * Recommend calling std::this_thread::yield() after calling this function
        */
        template <typename F>
        void sweep(size_type consumer_num, F&& func)
        {
            m_compositeList[consumer_num].sweep(std::forward<F>(func));
        }

        /**
         * A consumer function
         * consume nodes for all consumer numbers once through
         * invoking function func for every list node in consumer_num.
         * You could invoke this function after all task handler threads terminated
         * to ensure all nodes have been consumed
         */
        template <typename F>
        void sweepAll(F&& func)
        {
            for (size_type si = 0; si != m_consumerNum; ++si)
            {
                sweepOnce(si, std::forward<F>(func));
            }
        }

        /**
        * A consumer function
        * Similar to sweepOnce() but calls func() on elements in LIFO order
        *
        * func() is called for all elements in the list at the moment
        * reverseSweepOnce() is called.
        *
        * Recommend calling std::this_thread::yield() when this function returns false
        */
        template <typename F>
        bool reverseSweepOnce(size_type consumer_num, F&& func)
        {
            return m_compositeList[consumer_num].reverseSweepOnce(std::forward<F>(func));
        }

        /**
        * A consumer function
        * get all the nodes from consumer list consumer_num in input order
        * @ return a list of node
        *
        * Recommend calling std::this_thread::yield() when this function returns nullptr
        */
        T* getInputList(size_type consumer_num)
        {
            return m_compositeList[consumer_num].getInputList();
        }

        /**
        * A consumer function
        * get all the nodes from consumer list consumer_num in reversed input order
        * @ return a list of node
        *
        * Recommend calling std::this_thread::yield() when this function returns nullptr
        */
        T* getList()
        {
            return m_compositeList[consumer_num].getList();
        }

    private:
        // the producer and consumer count
        size_type m_producerNum;
        size_type m_consumerNum;
        // the next inserted list for producers
        std::vector<size_type> m_producerIdxs;
        // the composite atomic lists
        std::vector<AtomicForwardList<T>> m_compositeList;
    };

}


#endif    // COMPOSITE_ATOMIC_LIST_H

3.测试用代码:

#include <memory>
#include <cassert>

#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <random>
#include <cmath>

#include "folly_revise.h"

#include "composite_atomic_list.h"

using namespace folly;

struct student_name
{
    student_name(int age = 0)
        : age(age), next(nullptr)
    {

    }

    int age;

    student_name* next;
};

using ATOMIC_STUDENT_LIST = CompositeAtomicList<student_name>;

constexpr int PRODUCE_THREAD_NUM = 10;     // producing thread number
constexpr int CONSUME_THREAD_NUM = 5;      // consuming thread number

ATOMIC_STUDENT_LIST g_students(PRODUCE_THREAD_NUM, CONSUME_THREAD_NUM);

std::atomic<int> g_inserts; // insert num (successful)
std::atomic<int> g_drops;   // drop num (successful)

std::atomic<int> g_printNum;    // as same as g_drops

std::atomic<long> g_ageInSum;   // age sum when producing student_name
std::atomic<long> g_ageOutSum;  // age sum when consuming student_name

std::atomic<bool> goOn(true);

constexpr int ONE_THREAD_PRODUCE_NUM = 2000000;    // when testing, no more than this number, you know 20,000,00 * 100 * 10 ~= MAX_INT if thread num <= 10

inline void printOne(student_name* t)
{
    g_printNum.fetch_add(1, std::memory_order_relaxed);
    g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed);
    g_drops.fetch_add(1, std::memory_order_relaxed);
    delete t;
}

void insert_students(int idNo)
{
    std::default_random_engine dre(time(nullptr));
    std::uniform_int_distribution<int> ageDi(1, 99);

    for (int i = 0; i < ONE_THREAD_PRODUCE_NUM; ++i)
    {
        int newAge = ageDi(dre);
        g_ageInSum.fetch_add(newAge, std::memory_order_relaxed);

        g_students.insertHead(idNo, new student_name(newAge));
        // use memory_order_relaxed avoiding affect folly memory order
        g_inserts.fetch_add(1, std::memory_order_relaxed);
    }
}

void drop_students(int idNo)
{
    while (goOn.load(std::memory_order_relaxed))
    {
        //auto st = g_students.getInputList();
        //while (st)
        //{
        //    auto next = st->next;

        //    printOne(st);
        //    // use memory_order_relaxed avoiding affect folly memory order
        //    g_drops.fetch_add(1, std::memory_order_relaxed);

        //    st = next;
        //}

        g_students.sweep(idNo, printOne);
        std::this_thread::yield();
    }
}

int main()
{
    std::vector<std::future<void>> insert_threads;
    for (int i = 0; i != PRODUCE_THREAD_NUM; ++i)
    {
        insert_threads.push_back(std::async(std::launch::async, insert_students, i));
    }

    std::vector<std::future<void>> drop_threads;
    for (int i = 0; i != CONSUME_THREAD_NUM; ++i)
    {
        drop_threads.push_back(std::async(std::launch::async, drop_students, i));
    }

    for (auto& item : insert_threads)
    {
        item.get();
    }

    goOn.store(std::memory_order_relaxed);

    for (auto& item : drop_threads)
    {
        item.get();
    }

    g_students.sweepAll(printOne);

    std::cout << "insert count1: " << g_inserts.load() << std::endl;
    std::cout << "drop count1: " << g_drops.load() << std::endl;
    std::cout << "print num1: " << g_printNum.load() << std::endl;

    std::cout << "age in1: " << g_ageInSum.load() << std::endl;
    std::cout << "age out1: " << g_ageOutSum.load() << std::endl;

    std::cout << std::endl;
}

4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个AtomicInstructiveLinkedList来存储,只要生产均匀分布到各个消费队列中,应该可以实现比较好的效果。不过,由于生产均匀分布分布到各个消费队列中并不那么容易实现,通过使用随机化之类的方式,可以防止人为导致的不均匀。不过,都不能从根本上解决问题,所以,上述方法只有在比较容易实现生产均匀分布到各个消费队列时,适合采用。

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:【剑指offer】顺时针打印矩阵,C++实现

下一篇:7.C++类与封装的概念