我试图在共享内存上发布一些随机的东西;出于某些奇怪的原因,阅读器没有选择发件人写的东西
#include <sys/stat.h> #include <fcntl.h> #include <sys/mman.h> #include <unistd.h> #include <sys/types.h> #include <cstdio> class SHM { volatile char* _ptr; public: SHM() { const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666); const auto size = 4 * 1024 * 1024; if (-1 == ftruncate(handle, size)) { throw; } _ptr = (volatile char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); if(_ptr == MAP_FAILED){ throw; } int rc = fchmod(handle, 0666); if (rc == -1) { throw; } } bool read(uint64_t& magic, uint64_t& time) { const uint64_t newVal = *(uint64_t*)_ptr; if (newVal != magic) { magic = newVal; printf("value changed!!!\n"); time = *(uint64_t*)(_ptr + sizeof(magic)); return true; } //printf("old value: %lu\n", newVal); return false; } void publish(const uint64_t time) { __sync_fetch_and_add((uint64_t*)_ptr, time); __sync_synchronize(); *(uint64_t*)(_ptr + sizeof(uint64_t)) = time; } };
这是发件人:
#include <ctime> #include <unistd.h> #include <cstdlib> #include <cstdint> #include "shm.h" int main() { SHM shm; timespec t; for (auto i = 0; i < 10000; i++) { if (0 == clock_gettime(CLOCK_REALTIME, &t)) { const uint64_t v = t.tv_sec * 1000 * 1000 * 1000 + t.tv_nsec; shm.publish(v); printf("published %lu\n", v); usleep(100); } } }
这是读者:
#include <iostream> #include "shm.h" int main() { SHM shm; uint64_t magic = 0; uint64_t t = 0; while (true) { if (shm.read(magic, t)) { printf("%lu, %lu\n", magic, t); } } }
如果重新启动阅读器,则阅读器确实能够读取发送方已写入的最后一个值。
但是,如果我先启动阅读器,然后再启动发送器,则阅读器不会拾取发送器写入的所有值。
为了使这个更奇怪,如果我在SHM :: read()中取消对printf语句的注释,那么读者有时可以使用。
任何的想法?
GCC版本:
g++ (GCC) 7.2.1 20170829 (Red Hat 7.2.1-1)
我发现了几个问题,但是,我不确定它们是否可以解决您的问题。
name
shm_open
/
read
publish
volatile
const uint64_t newVal = *(uint64_t volatile*)_ptr;
std::atomic
尽管涉及不同的过程,但是仍然存在相同的对象被多个执行线程访问的情况,并且这些线程中的至少一个会修改共享对象。
我进行了上述更改。使用std::atomic固定的:
class SHM { void* _ptr; public: SHM() { const auto handle = shm_open("/myTest", O_RDWR|O_CREAT, 0666); const auto size = 4 * 1024 * 1024; if (-1 == ftruncate(handle, size)) throw; _ptr = mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); if(_ptr == MAP_FAILED) throw; } bool read(uint64_t& magic, uint64_t& time) { auto p = static_cast<std::atomic<uint64_t>*>(_ptr); const uint64_t newVal = p[0]; if (newVal != magic) { magic = newVal; printf("value changed!!!\n"); time = p[1]; return true; } return false; } void publish(const uint64_t time) { auto p = static_cast<std::atomic<uint64_t>*>(_ptr); p[0] += time; p[1] = time; } }; void sender() { SHM shm; timespec t; for (auto i = 0; i < 10000; i++) { if (0 == clock_gettime(CLOCK_REALTIME, &t)) { const uint64_t v = t.tv_sec * 1000 * 1000 * 1000 + t.tv_nsec; shm.publish(v); printf("published %lu\n", v); usleep(100); } } } void reader() { SHM shm; uint64_t magic = 0; uint64_t t = 0; while (true) { if (shm.read(magic, t)) { printf("%lu, %lu\n", magic, t); } } } int main(int ac, char**) { if(ac > 1) reader(); else sender(); }
有了std::atomic您,您可以拥有更多控制权。例如:
struct Data { std::atomic<uint64_t> time; std::atomic<uint64_t> generation; }; // ... bool read(uint64_t& generation, uint64_t& time) { auto data = static_cast<Data*>(_ptr); auto new_generation = data->generation.load(std::memory_order_acquire); // 1. Syncronizes with (2). if(generation == new_generation) return false; generation = new_generation; time = data->time.load(std::memory_order_relaxed); printf("value changed!!!\n"); return true; } void publish(const uint64_t time) { auto data = static_cast<Data*>(_ptr); data->time.store(time, std::memory_order_relaxed); data->generation.fetch_add(time, std::memory_order_release); // 2. (1) Synchronises with this store. }