diff --git a/crazy_functions/test_project/cpp/cppipc/buffer.cpp b/crazy_functions/test_project/cpp/cppipc/buffer.cpp deleted file mode 100644 index 084b815..0000000 --- a/crazy_functions/test_project/cpp/cppipc/buffer.cpp +++ /dev/null @@ -1,87 +0,0 @@ -#include "libipc/buffer.h" -#include "libipc/utility/pimpl.h" - -#include - -namespace ipc { - -bool operator==(buffer const & b1, buffer const & b2) { - return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0); -} - -bool operator!=(buffer const & b1, buffer const & b2) { - return !(b1 == b2); -} - -class buffer::buffer_ : public pimpl { -public: - void* p_; - std::size_t s_; - void* a_; - buffer::destructor_t d_; - - buffer_(void* p, std::size_t s, buffer::destructor_t d, void* a) - : p_(p), s_(s), a_(a), d_(d) { - } - - ~buffer_() { - if (d_ == nullptr) return; - d_((a_ == nullptr) ? p_ : a_, s_); - } -}; - -buffer::buffer() - : buffer(nullptr, 0, nullptr, nullptr) { -} - -buffer::buffer(void* p, std::size_t s, destructor_t d) - : p_(p_->make(p, s, d, nullptr)) { -} - -buffer::buffer(void* p, std::size_t s, destructor_t d, void* additional) - : p_(p_->make(p, s, d, additional)) { -} - -buffer::buffer(void* p, std::size_t s) - : buffer(p, s, nullptr) { -} - -buffer::buffer(char const & c) - : buffer(const_cast(&c), 1) { -} - -buffer::buffer(buffer&& rhs) - : buffer() { - swap(rhs); -} - -buffer::~buffer() { - p_->clear(); -} - -void buffer::swap(buffer& rhs) { - std::swap(p_, rhs.p_); -} - -buffer& buffer::operator=(buffer rhs) { - swap(rhs); - return *this; -} - -bool buffer::empty() const noexcept { - return (impl(p_)->p_ == nullptr) || (impl(p_)->s_ == 0); -} - -void* buffer::data() noexcept { - return impl(p_)->p_; -} - -void const * buffer::data() const noexcept { - return impl(p_)->p_; -} - -std::size_t buffer::size() const noexcept { - return impl(p_)->s_; -} - -} // namespace ipc diff --git a/crazy_functions/test_project/cpp/cppipc/ipc.cpp b/crazy_functions/test_project/cpp/cppipc/ipc.cpp deleted file mode 100644 index 4dc71c0..0000000 --- a/crazy_functions/test_project/cpp/cppipc/ipc.cpp +++ /dev/null @@ -1,701 +0,0 @@ - -#include -#include -#include -#include // std::pair, std::move, std::forward -#include -#include // aligned_storage_t -#include -#include -#include -#include - -#include "libipc/ipc.h" -#include "libipc/def.h" -#include "libipc/shm.h" -#include "libipc/pool_alloc.h" -#include "libipc/queue.h" -#include "libipc/policy.h" -#include "libipc/rw_lock.h" -#include "libipc/waiter.h" - -#include "libipc/utility/log.h" -#include "libipc/utility/id_pool.h" -#include "libipc/utility/scope_guard.h" -#include "libipc/utility/utility.h" - -#include "libipc/memory/resource.h" -#include "libipc/platform/detail.h" -#include "libipc/circ/elem_array.h" - -namespace { - -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; - -template -struct msg_t; - -template -struct msg_t<0, AlignSize> { - msg_id_t cc_id_; - msg_id_t id_; - std::int32_t remain_; - bool storage_; -}; - -template -struct msg_t : msg_t<0, AlignSize> { - std::aligned_storage_t data_ {}; - - msg_t() = default; - msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) - : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { - if (this->storage_) { - if (data != nullptr) { - // copy storage-id - *reinterpret_cast(&data_) = - *static_cast(data); - } - } - else std::memcpy(&data_, data, size); - } -}; - -template -ipc::buff_t make_cache(T& data, std::size_t size) { - auto ptr = ipc::mem::alloc(size); - std::memcpy(ptr, &data, (ipc::detail::min)(sizeof(data), size)); - return { ptr, size, ipc::mem::free }; -} - -struct cache_t { - std::size_t fill_; - ipc::buff_t buff_; - - cache_t(std::size_t f, ipc::buff_t && b) - : fill_(f), buff_(std::move(b)) - {} - - void append(void const * data, std::size_t size) { - if (fill_ >= buff_.size() || data == nullptr || size == 0) return; - auto new_fill = (ipc::detail::min)(fill_ + size, buff_.size()); - std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); - fill_ = new_fill; - } -}; - -auto cc_acc() { - static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); - return static_cast(acc_h.get()); -} - -IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { - return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; -} - -IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), align_chunk_size( - ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); -} - -struct chunk_t { - std::atomic &conns() noexcept { - return *reinterpret_cast *>(this); - } - - void *data() noexcept { - return reinterpret_cast(this) - + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); - } -}; - -struct chunk_info_t { - ipc::id_pool<> pool_; - ipc::spin_lock lock_; - - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunk_size; - } - - ipc::byte_t *chunks_mem() noexcept { - return reinterpret_cast(this + 1); - } - - chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { - if (id < 0) return nullptr; - return reinterpret_cast(chunks_mem() + (chunk_size * id)); - } -}; - -auto& chunk_storages() { - class chunk_handle_t { - ipc::shm::handle handle_; - - public: - chunk_info_t *get_info(std::size_t chunk_size) { - if (!handle_.valid() && - !handle_.acquire( ("__CHUNK_INFO__" + ipc::to_string(chunk_size)).c_str(), - sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) { - ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size); - return nullptr; - } - auto info = static_cast(handle_.get()); - if (info == nullptr) { - ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size); - return nullptr; - } - return info; - } - }; - static ipc::map chunk_hs; - return chunk_hs; -} - -chunk_info_t *chunk_storage_info(std::size_t chunk_size) { - auto &storages = chunk_storages(); - std::decay_t::iterator it; - { - static ipc::rw_lock lock; - IPC_UNUSED_ std::shared_lock guard {lock}; - if ((it = storages.find(chunk_size)) == storages.end()) { - using chunk_handle_t = std::decay_t::value_type::second_type; - guard.unlock(); - IPC_UNUSED_ std::lock_guard guard {lock}; - it = storages.emplace(chunk_size, chunk_handle_t{}).first; - } - } - return it->second.get_info(chunk_size); -} - -std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { - std::size_t chunk_size = calc_chunk_size(size); - auto info = chunk_storage_info(chunk_size); - if (info == nullptr) return {}; - - info->lock_.lock(); - info->pool_.prepare(); - // got an unique id - auto id = info->pool_.acquire(); - info->lock_.unlock(); - - auto chunk = info->at(chunk_size, id); - if (chunk == nullptr) return {}; - chunk->conns().store(conns, std::memory_order_relaxed); - return { id, chunk->data() }; -} - -void *find_storage(ipc::storage_id_t id, std::size_t size) { - if (id < 0) { - ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); - return nullptr; - } - std::size_t chunk_size = calc_chunk_size(size); - auto info = chunk_storage_info(chunk_size); - if (info == nullptr) return nullptr; - return info->at(chunk_size, id)->data(); -} - -void release_storage(ipc::storage_id_t id, std::size_t size) { - if (id < 0) { - ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); - return; - } - std::size_t chunk_size = calc_chunk_size(size); - auto info = chunk_storage_info(chunk_size); - if (info == nullptr) return; - info->lock_.lock(); - info->pool_.release(id); - info->lock_.unlock(); -} - -template -bool sub_rc(ipc::wr, - std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { - return true; -} - -template -bool sub_rc(ipc::wr, - std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { - auto last_conns = curr_conns & ~conn_id; - for (unsigned k = 0;;) { - auto chunk_conns = conns.load(std::memory_order_acquire); - if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { - return (chunk_conns & last_conns) == 0; - } - ipc::yield(k); - } -} - -template -void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { - if (id < 0) { - ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); - return; - } - std::size_t chunk_size = calc_chunk_size(size); - auto info = chunk_storage_info(chunk_size); - if (info == nullptr) return; - - auto chunk = info->at(chunk_size, id); - if (chunk == nullptr) return; - - if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { - return; - } - info->lock_.lock(); - info->pool_.release(id); - info->lock_.unlock(); -} - -template -bool clear_message(void* p) { - auto msg = static_cast(p); - if (msg->storage_) { - std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; - if (r_size <= 0) { - ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); - return true; - } - release_storage( - *reinterpret_cast(&msg->data_), - static_cast(r_size)); - } - return true; -} - -struct conn_info_head { - - ipc::string name_; - msg_id_t cc_id_; // connection-info id - ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_; - ipc::shm::handle acc_h_; - - conn_info_head(char const * name) - : name_ {name} - , cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)} - , cc_waiter_{("__CC_CONN__" + name_).c_str()} - , wt_waiter_{("__WT_CONN__" + name_).c_str()} - , rd_waiter_{("__RD_CONN__" + name_).c_str()} - , acc_h_ {("__AC_CONN__" + name_).c_str(), sizeof(acc_t)} { - } - - void quit_waiting() { - cc_waiter_.quit_waiting(); - wt_waiter_.quit_waiting(); - rd_waiter_.quit_waiting(); - } - - auto acc() { - return static_cast(acc_h_.get()); - } - - auto& recv_cache() { - thread_local ipc::unordered_map tls; - return tls; - } -}; - -template -bool wait_for(W& waiter, F&& pred, std::uint64_t tm) { - if (tm == 0) return !pred(); - for (unsigned k = 0; pred();) { - bool ret = true; - ipc::sleep(k, [&k, &ret, &waiter, &pred, tm] { - ret = waiter.wait_if(std::forward(pred), tm); - k = 0; - }); - if (!ret) return false; // timeout or fail - if (k == 0) break; // k has been reset - } - return true; -} - -template -struct queue_generator { - - using queue_t = ipc::queue, Policy>; - - struct conn_info_t : conn_info_head { - queue_t que_; - - conn_info_t(char const * name) - : conn_info_head{name} - , que_{("__QU_CONN__" + - ipc::to_string(DataSize) + "__" + - ipc::to_string(AlignSize) + "__" + name).c_str()} { - } - - void disconnect_receiver() { - bool dis = que_.disconnect(); - this->quit_waiting(); - if (dis) { - this->recv_cache().clear(); - } - } - }; -}; - -template -struct detail_impl { - -using policy_t = Policy; -using flag_t = typename policy_t::flag_t; -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; - -constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { - return static_cast(h); -} - -constexpr static queue_t* queue_of(ipc::handle_t h) noexcept { - return (info_of(h) == nullptr) ? nullptr : &(info_of(h)->que_); -} - -/* API implementations */ - -static void disconnect(ipc::handle_t h) { - auto que = queue_of(h); - if (que == nullptr) { - return; - } - que->shut_sending(); - assert(info_of(h) != nullptr); - info_of(h)->disconnect_receiver(); -} - -static bool reconnect(ipc::handle_t * ph, bool start_to_recv) { - assert(ph != nullptr); - assert(*ph != nullptr); - auto que = queue_of(*ph); - if (que == nullptr) { - return false; - } - if (start_to_recv) { - que->shut_sending(); - if (que->connect()) { // wouldn't connect twice - info_of(*ph)->cc_waiter_.broadcast(); - return true; - } - return false; - } - // start_to_recv == false - if (que->connected()) { - info_of(*ph)->disconnect_receiver(); - } - return que->ready_sending(); -} - -static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) { - assert(ph != nullptr); - if (*ph == nullptr) { - *ph = ipc::mem::alloc(name); - } - return reconnect(ph, start_to_recv); -} - -static void destroy(ipc::handle_t h) { - disconnect(h); - ipc::mem::free(info_of(h)); -} - -static std::size_t recv_count(ipc::handle_t h) noexcept { - auto que = queue_of(h); - if (que == nullptr) { - return ipc::invalid_value; - } - return que->conn_count(); -} - -static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { - auto que = queue_of(h); - if (que == nullptr) { - return false; - } - return wait_for(info_of(h)->cc_waiter_, [que, r_count] { - return que->conn_count() < r_count; - }, tm); -} - -template -static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) { - if (data == nullptr || size == 0) { - ipc::error("fail: send(%p, %zd)\n", data, size); - return false; - } - auto que = queue_of(h); - if (que == nullptr) { - ipc::error("fail: send, queue_of(h) == nullptr\n"); - return false; - } - if (que->elems() == nullptr) { - ipc::error("fail: send, queue_of(h)->elems() == nullptr\n"); - return false; - } - if (!que->ready_sending()) { - ipc::error("fail: send, que->ready_sending() == false\n"); - return false; - } - ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); - if (conns == 0) { - ipc::error("fail: send, there is no receiver on this connection.\n"); - return false; - } - // calc a new message id - auto acc = info_of(h)->acc(); - if (acc == nullptr) { - ipc::error("fail: send, info_of(h)->acc() == nullptr\n"); - return false; - } - auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); - auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); - if (size > ipc::large_msg_limit) { - auto dat = acquire_storage(size, conns); - void * buf = dat.second; - if (buf != nullptr) { - std::memcpy(buf, data, size); - return try_push(static_cast(size) - - static_cast(ipc::data_length), &(dat.first), 0); - } - // try using message fragment - //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); - } - // push message fragment - std::int32_t offset = 0; - for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { - if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), - static_cast(data) + offset, ipc::data_length)) { - return false; - } - } - // if remain > 0, this is the last message fragment - std::int32_t remain = static_cast(size) - offset; - if (remain > 0) { - if (!try_push(remain - static_cast(ipc::data_length), - static_cast(data) + offset, - static_cast(remain))) { - return false; - } - } - return true; -} - -static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return send([tm](auto info, auto que, auto msg_id) { - return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { - if (!wait_for(info->wt_waiter_, [&] { - return !que->push( - [](void*) { return true; }, - info->cc_id_, msg_id, remain, data, size); - }, tm)) { - ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push( - clear_message, - info->cc_id_, msg_id, remain, data, size)) { - return false; - } - } - info->rd_waiter_.broadcast(); - return true; - }; - }, h, data, size); -} - -static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return send([tm](auto info, auto que, auto msg_id) { - return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { - if (!wait_for(info->wt_waiter_, [&] { - return !que->push( - [](void*) { return true; }, - info->cc_id_, msg_id, remain, data, size); - }, tm)) { - return false; - } - info->rd_waiter_.broadcast(); - return true; - }; - }, h, data, size); -} - -static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { - auto que = queue_of(h); - if (que == nullptr) { - ipc::error("fail: recv, queue_of(h) == nullptr\n"); - return {}; - } - if (!que->connected()) { - // hasn't connected yet, just return. - return {}; - } - auto& rc = info_of(h)->recv_cache(); - for (;;) { - // pop a new message - typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { - return !que->pop(msg); - }, tm)) { - // pop failed, just return. - return {}; - } - info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { - continue; // ignore message to self - } - // msg.remain_ may minus & abs(msg.remain_) < data_length - std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; - if (r_size <= 0) { - ipc::error("fail: recv, r_size = %d\n", (int)r_size); - return {}; - } - std::size_t msg_size = static_cast(r_size); - // large message - if (msg.storage_) { - ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); - void* buf = find_storage(buf_id, msg_size); - if (buf != nullptr) { - struct recycle_t { - ipc::storage_id_t storage_id; - ipc::circ::cc_t curr_conns; - ipc::circ::cc_t conn_id; - } *r_info = ipc::mem::alloc(recycle_t{ - buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() - }); - if (r_info == nullptr) { - ipc::log("fail: ipc::mem::alloc.\n"); - return ipc::buff_t{buf, msg_size}; // no recycle - } else { - return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { - auto r_info = static_cast(p_info); - IPC_UNUSED_ auto finally = ipc::guard([r_info] { - ipc::mem::free(r_info); - }); - recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); - }, r_info}; - } - } else { - ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); - continue; - } - } - // find cache with msg.id_ - auto cac_it = rc.find(msg.id_); - if (cac_it == rc.end()) { - if (msg_size <= ipc::data_length) { - return make_cache(msg.data_, msg_size); - } - // gc - if (rc.size() > 1024) { - std::vector need_del; - for (auto const & pair : rc) { - auto cmp = std::minmax(msg.id_, pair.first); - if (cmp.second - cmp.first > 8192) { - need_del.push_back(pair.first); - } - } - for (auto id : need_del) rc.erase(id); - } - // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); - } - // has cached before this message - else { - auto& cac = cac_it->second; - // this is the last message fragment - if (msg.remain_ <= 0) { - cac.append(&(msg.data_), msg_size); - // finish this message, erase it from cache - auto buff = std::move(cac.buff_); - rc.erase(cac_it); - return buff; - } - // there are remain datas after this message - cac.append(&(msg.data_), ipc::data_length); - } - } -} - -static ipc::buff_t try_recv(ipc::handle_t h) { - return recv(h, 0); -} - -}; // detail_impl - -template -using policy_t = ipc::policy::choose; - -} // internal-linkage - -namespace ipc { - -template -ipc::handle_t chan_impl::inited() { - ipc::detail::waiter::init(); - return nullptr; -} - -template -bool chan_impl::connect(ipc::handle_t * ph, char const * name, unsigned mode) { - return detail_impl>::connect(ph, name, mode & receiver); -} - -template -bool chan_impl::reconnect(ipc::handle_t * ph, unsigned mode) { - return detail_impl>::reconnect(ph, mode & receiver); -} - -template -void chan_impl::disconnect(ipc::handle_t h) { - detail_impl>::disconnect(h); -} - -template -void chan_impl::destroy(ipc::handle_t h) { - detail_impl>::destroy(h); -} - -template -char const * chan_impl::name(ipc::handle_t h) { - auto info = detail_impl>::info_of(h); - return (info == nullptr) ? nullptr : info->name_.c_str(); -} - -template -std::size_t chan_impl::recv_count(ipc::handle_t h) { - return detail_impl>::recv_count(h); -} - -template -bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { - return detail_impl>::wait_for_recv(h, r_count, tm); -} - -template -bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return detail_impl>::send(h, data, size, tm); -} - -template -buff_t chan_impl::recv(ipc::handle_t h, std::uint64_t tm) { - return detail_impl>::recv(h, tm); -} - -template -bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return detail_impl>::try_send(h, data, size, tm); -} - -template -buff_t chan_impl::try_recv(ipc::handle_t h) { - return detail_impl>::try_recv(h); -} - -template struct chan_impl>; -// template struct chan_impl>; // TBD -// template struct chan_impl>; // TBD -template struct chan_impl>; -template struct chan_impl>; - -} // namespace ipc diff --git a/crazy_functions/test_project/cpp/cppipc/policy.h b/crazy_functions/test_project/cpp/cppipc/policy.h deleted file mode 100644 index 8959607..0000000 --- a/crazy_functions/test_project/cpp/cppipc/policy.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include - -#include "libipc/def.h" -#include "libipc/prod_cons.h" - -#include "libipc/circ/elem_array.h" - -namespace ipc { -namespace policy { - -template