2019年4月10日 星期三

從建立以zeromq 實作的library看Vortex Lite library設計

最初為了讓zeromq實作的通訊功能能reuse而建立一套新的library
過程中發現,此library的設計與VortexLite 極為相似
因此甚至最後會猜測並沿用VortexLite的設計概念

此為個人發現及臆測,並非經過Prismtech 證實

一、簡介zeromq

zeromq是個效率佳及程式碼簡易的IPC protocol 
提供多種通訊模式,以下以Pub & Sub模式為例
sample code如下
publisher
#include "zmq.h"
void main(void)
{
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher , "ipc:///hello");
    while (1) {
        zmq_send (publisher , "hello zmq", 9, 0);
        sleep(1);
    }
    zmq_close (publisher );
    zmq_ctx_destroy (context);
}
subscriber
#include "zmq.h"
void main(void)
{
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "ipc:///hello");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
    while (1) {
        char buffer[255];
        int ret = zmq_recv(subscriber , buffer, 255, 0);
        if (ret > 0)
            printf ("[recv] %s\n", , buffer);
    }
    zmq_close (subscriber);
    zmq_ctx_destroy (context);
}
可以看到比起TCP socket API
zeromq提供更抽象的API

二、zeromq reuse問題

如果我們的應用都只是傳傳純文字字串或者小量的資料或許直接使用zeromq原生library 即可
但是假如需要傳送的是大量的raw data (如DAQ資料)、或是某個struct (各種type包在一起)
我們當然還是可以直接用原生API,但為了處理這些data,程式碼必須增加許多東西 (像是序列化,或者套json/xml格式)
可是這些增加的部分,是不是每次寫一個新的zeromq應用就必須重新寫一次?

三、library 設計

1.序列化(反序列化)

你不能直接用memcpy copy整個struct【1】
假設有一個struct
typedef struct hello {
    uint16_t id;
    char *msg;
}hello;
為了將hello這個結構轉成byte stream,可以想像我們至少要有兩個funciton,serilaize_int 及 serilaize_string
我們要讓socket data至少變成如下

每一種struct 會有一種對應的serilaize / deserilaize 流程
才能正確地解釋data
於是我們建立各種C語言data type的serilaize / deserilaize API,並組合產生各struct的serilaize / deserilaize流程
方法有很多種,其實套入xml / json 也是一種方式

2.socket bind data type

今天我跟一個英國人打電話(socket1),要用英文溝通(data type1)
情況只是1對1,或許大腦(程式)簡單
如果今天我同時還要跟一個台灣人講電話(socket2),我的大腦(程式)要能夠分辨是哪一個語言(data type1 or data type2 ?)
我為了能夠正確解析對方的內容,要走對的翻譯流程(serilaize / deserilaize)
所以socket 要與data type做連結

a.建立一個以zmq socket entity 及 serilaize / deserilaize 的 function pointer為member的struct 

b.包裝create publisher及subscriber funciton

bind serilaize / deserilaize function pointer

c.包裝read/write function

計算要送的data 總共需要的bytes
serilaize 並copy到bytes buffer送出
使用function pointer及void type達到多型的應用
typedef int (*serialize_fn)(uint8_t *, void *);
typedef void (*deserialize_fn)(uint8_t *, void **, int);
typedef struct ipc_entity
{
    void *entity;
    serialize_fn serialize;
    deserialize_fn deserialize;
} ipc_entity;
int ipc_create_publisher(void *context, ipc_entity_t *zmq_pub, const char *addr, serialize_fn serialize)
{
    ipc_entity_t pub = malloc(sizeof(ipc_entity_t));
    pub->entity = zmq_socket(context, ZMQ_PUB);
    zmq_bind(pub->entity, addr);
    pub->serialize = serialize;
    *zmq_pub = pub;
}
int ipc_write(ipc_entity_t zmq_pub, void *sample, int flags)
{
    uint32_t _size = zmq_pub->topic->get_size(sample);
    uint8_t *buff = calloc(_size, 1);
    uint32_t size = zmq_pub->topic->serialize(buff, sample);
    uint32_t ret = zmq_send(zmq_pub->entity, buff, size, 0);
    free(buff);
    if (ret != _size)
        return -1;
    return ret;
}
int ipc_create_subscriber(void *context, ipc_entity_t *zmq_sub, const char *addr, deserialize_fn deserialize)
{
    ipc_entity_t sub = malloc(sizeof(ipc_entity_t));
    sub->entity = zmq_socket(context, ZMQ_SUB);
    zmq_connect(sub->entity, addr);
    zmq_setsockopt(sub->entity, ZMQ_SUBSCRIBE, "", 0);
    sub->deserialize = deserialize;
    *zmq_sub = sub;
}
int ipc_read(ipc_entity_t zmq_sub, void **sample, bool *valid_data)
{
    uint8_t *buff = calloc(_size, 1);
    int ret = zmq_recv(zmq_sub->entity, buff, _size, 0);
    if (ret != _size)
    {
        *valid_data = false;
        return -1;
    }
    *valid_data = true;
    void *temp;
    zmq_sub->deserialize(buff, &temp, ret);
    free(buff);
    *sample = temp;
    return ret;
}
所以我的大腦會變成
void main(void)
{
    ipc_create_publisher("電話","英國連線","電話號碼","英文翻譯");
    ipc_create_publisher("電話","台灣連線","電話號碼","中文翻譯");
    英文 payload1.msg = "hello";
    ipc_write("英國連線",payload1);
    中文 payload2.msg = "你好";
    ipc_write("台灣連線",payload2);
}

如果比對VortexLite API,是不是滿像的?
在DDS,Topic其中一個組成要素就是data type
所以在建立writer/ reader時 才要綁定一個Topic
IDL會被轉成一個.c和.h檔案,這兩個檔案就是用來描述data type
幫助底層的程式翻譯(serilaize / deserilaize)
DDS_EXPORT int dds_writer_create
(
  dds_entity_t pp_or_pub,
  dds_entity_t * writer,
  dds_entity_t topic,
  const dds_qos_t * qos,
  const dds_writerlistener_t * listener
);
DDS_EXPORT int dds_write (dds_entity_t wr, const void *data);
DDS_EXPORT int dds_reader_create
(
  dds_entity_t pp_or_sub,
  dds_entity_t * reader,
  dds_entity_t topic,
  const dds_qos_t * qos,
  const dds_readerlistener_t * listener
);
DDS_EXPORT int dds_read_cond
(
  dds_entity_t rd,
  void ** buf,
  uint32_t maxs,
  dds_sample_info_t * si,
  dds_condition_t cond
);

3.改善

在我臆測到VortexLite API的設計概念後
我也試著將VortexLite 所用到的程式技術應用到我這

a.pointer data type

有些user使用的type會是pointer組成,為了讓接收端能夠精確的反解該指標內容
前面會增加一個訊息表示該pointer的長度
例如 : char *msg = "hello"
前面會有兩個bytes描述 hello有5個字,後面再接著放hello這個內容
對照VortexLite,有個sequence data type用來處理指標
它是一個VortexLite定義的struct,同樣會有個length描述_buffer這個指標內容的長度
typedef struct sequence
{
  uint32_t _maximum;
  uint32_t _length;
  uint8_t (*_buffer);
  bool _release;
} sequence;

b.function pointer

這個方式在我以前很少用,但在建立library的時候,library的主流程能夠固定,然後讓子流程留給user定義,同時能夠讓source code boundary 更好
可以見ipc_write function 內呼叫serialize ,我將此處應用function pointer讓user可以自己定義自己的serialize 流程
因為不同的struct data,serialize 流程不同
同理,ipc_read 及 deserialize 一樣

c.hack struct member

學習直接用pointer 及 offset 做member處理 
因為我目前各data type的serialize / deserialize 需要寫code,增加不便性
目前像這樣
hello_sample.c
#include "hello_sample.h"
static void free_sequence(sequence *ptr)
{
    free(ptr->_buffer);
    ptr->_buffer = NULL;
}
void hello_free(hello_sample *sample)
{
    free_sequence(&sample->payload);
    free(sample);
    sample = NULL;
}
int hello_get_size(void *sample)
{
    hello_sample * this = (hello_sample *)sample;
    int size = 0;
    size += 4;
    size += 4;
    size += 8*this->payload._length;
    return size;
}
int hello_serialize(uint8_t *buff,void *sample)
{
    hello_sample * this = (hello_sample *)sample;
    int size = 0;
    buff = serialize_u32(buff,&this->id,1,&size);
    buff = serialize_u32(buff,&this->payload._length,1,&size);
    buff = serialize_f64(buff,this->payload._buffer,this->payload._length,&size);
    return size;
}
void hello_deserialize(uint8_t *buff,void **sample,int ret)
{
    hello_sample * this malloc(sizeof(hello_sample));
    int size = 0;
    buff = deserialize_u32(buff,&this->id,1,&size,ret);
    buff = deserialize_u32(buff,&this->payload._length,1,&size,ret);
    this->payload._buffer = calloc(this->payload._length,8);
    buff = deserialize_f64(buff,this->payload._buffer,this->payload._length,&size,ret);
    *sample = this;
}
ipc_topic_descriptor_t hello_descriptor =
{
    hello_get_size,
    hello_serialize,
    hello_deserialize
};
對照VortexLite 轉譯idl產生的.c檔
HelloWorldData_Msg_ops就是搭配offsetof 紀錄struct 的member 資訊
幫助VortexLite 在底下做 資料的處理
對使用者來說,也隱藏 serialize / deserialize 的實際流程 (我目前的方式是顯現的)
#include "HelloWorldData.h"
static const dds_key_descriptor_t HelloWorldData_Msg_keys[1] =
{
  "userID", 0 }
};
static const uint32_t HelloWorldData_Msg_ops [] =
{
  DDS_OP_ADR | DDS_OP_TYPE_4BY | DDS_OP_FLAG_KEY, offsetof (HelloWorldData_Msg, userID),
  DDS_OP_ADR | DDS_OP_TYPE_STR, offsetof (HelloWorldData_Msg, message),
  DDS_OP_RTS
};
const dds_topic_descriptor_t HelloWorldData_Msg_desc =
{
  sizeof (HelloWorldData_Msg),
  sizeof (char *),
  DDS_TOPIC_FIXED_KEY | DDS_TOPIC_NO_OPTIMIZE,
  1u,
  "HelloWorldData::Msg",
  HelloWorldData_Msg_keys,
  HelloWorldData_Msg_ops,
  "<MetaData version=\"1.0.0\"><Module name=\"HelloWorldData\"><Struct name=\"Msg\"><Member name=\"userID\"><Long/></Member><Member name=\"message\"><String/></Member></Struct></Module></MetaData>"
};

沒有留言:

張貼留言

NoSQL Redis intro

Redis是一個使用ANSI C編寫的開源、支援網路、基於記憶體、可選永續性的鍵值對儲存資料庫。 支援rdb 及aof 兩種儲存方式 From  https://zh.wikipedia.org/wiki/Redis Redis 目前擁有兩種資料...