我正在尝试在Linux平台上探索协议缓冲区(PB),我的编码语言是C ++。我在协议缓冲区在线文档中找到了示例,但没有特定于套接字发送和接收的内容(或者我完全错过了:))。因此,我决定在实际消息之前添加消息Length,并通过套接字发送它。如果有人可以提出比我计划做的更好的解决方案,并且在PB中已经准备好任何用于创建此类数据包的东西,我将不胜感激。
但是我仍然在服务器端遇到问题,必须对数据包进行解码。假设客户端发送了一个10字节的数据包,其中前4个字节是该数据包的长度;但是不可能在解码分组之前知道长度。因此,即使我读取了第一个4字节,也如何使用Protocol Buffer推断一半读取数据包的值。
最后我可以使它工作了。我将代码发布在这里,以便人们可以对其进行评论和评论,以及是否有人想要在c ++中实现它,这部分代码可以提供帮助。我的初衷是为了使Protobuf以长度前缀的方式工作,这是一个简陋的代码。我从某个我不记得的站点上获取了客户端服务器的代码,并对其进行了修改以适应protobuf。在这里,服务器首先窥视套接字并获取总包的长度,然后进行实际的套接字读取以读取整个包。可能有无数种方法可以做到,但是为了快速解决,我以这种方式做到了。但是我需要找到一种更好的方法来避免每个数据包2个recv,但是在我的情况下,所有消息的大小都是不同的,所以这是我猜测的唯一方法。
原始文件
message log_packet { required fixed64 log_time =1; required fixed32 log_micro_sec =2; required fixed32 sequence_no =3; required fixed32 shm_app_id =4; required string packet_id =5; required string log_level=6; required string log_msg=7; }
协议缓冲区客户端代码
#include <unistd.h> #include "message.pb.h" #include <iostream> #include <google/protobuf/message.h> #include <google/protobuf/descriptor.h> #include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> using namespace google::protobuf::io; using namespace std; int main(int argv, char** argc){ /* Coded output stram */ log_packet payload ; payload.set_log_time(10); payload.set_log_micro_sec(10); payload.set_sequence_no(1); payload.set_shm_app_id(101); payload.set_packet_id("TST"); payload.set_log_level("DEBUG"); payload.set_log_msg("What shall we say then"); cout<<"size after serilizing is "<<payload.ByteSize()<<endl; int siz = payload.ByteSize()+4; char *pkt = new char [siz]; google::protobuf::io::ArrayOutputStream aos(pkt,siz); CodedOutputStream *coded_output = new CodedOutputStream(&aos); coded_output->WriteVarint32(payload.ByteSize()); payload.SerializeToCodedStream(coded_output); int host_port= 1101; char* host_name="127.0.0.1"; struct sockaddr_in my_addr; char buffer[1024]; int bytecount; int buffer_len=0; int hsock; int * p_int; int err; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %d\n",errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %d\n",errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = inet_addr(host_name); if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ if((err = errno) != EINPROGRESS){ fprintf(stderr, "Error connecting socket %d\n", errno); goto FINISH; } } for (int i =0;i<10000;i++){ for (int j = 0 ;j<10;j++) { if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %d\n", errno); goto FINISH; } printf("Sent bytes %d\n", bytecount); usleep(1); } } delete pkt; FINISH: close(hsock); }
协议缓冲区服务器代码
#include <fcntl.h> #include <string.h> #include <stdlib.h> #include <errno.h> #include <stdio.h> #include <netinet/in.h> #include <resolv.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <pthread.h> #include "message.pb.h" #include <iostream> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl.h> using namespace std; using namespace google::protobuf::io; void* SocketHandler(void*); int main(int argv, char** argc){ int host_port= 1101; struct sockaddr_in my_addr; int hsock; int * p_int ; int err; socklen_t addr_size = 0; int* csock; sockaddr_in sadr; pthread_t thread_id=0; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %d\n", errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %d\n", errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = INADDR_ANY ; if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); goto FINISH; } if(listen( hsock, 10) == -1 ){ fprintf(stderr, "Error listening %d\n",errno); goto FINISH; } //Now lets do the server stuff addr_size = sizeof(sockaddr_in); while(true){ printf("waiting for a connection\n"); csock = (int*)malloc(sizeof(int)); if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){ printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); pthread_create(&thread_id,0,&SocketHandler, (void*)csock ); pthread_detach(thread_id); } else{ fprintf(stderr, "Error accepting %d\n", errno); } } FINISH: ;//oops } google::protobuf::uint32 readHdr(char *buf) { google::protobuf::uint32 size; google::protobuf::io::ArrayInputStream ais(buf,4); CodedInputStream coded_input(&ais); coded_input.ReadVarint32(&size);//Decode the HDR and get the size cout<<"size of payload is "<<size<<endl; return size; } void readBody(int csock,google::protobuf::uint32 siz) { int bytecount; log_packet payload; char buffer [siz+4];//size of the payload and hdr //Read the entire buffer including the hdr if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ fprintf(stderr, "Error receiving data %d\n", errno); } cout<<"Second read byte count is "<<bytecount<<endl; //Assign ArrayInputStream with enough memory google::protobuf::io::ArrayInputStream ais(buffer,siz+4); CodedInputStream coded_input(&ais); //Read an unsigned integer with Varint encoding, truncating to 32 bits. coded_input.ReadVarint32(&siz); //After the message's length is read, PushLimit() is used to prevent the CodedInputStream //from reading beyond that length.Limits are used when parsing length-delimited //embedded messages google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); //De-Serialize payload.ParseFromCodedStream(&coded_input); //Once the embedded message has been parsed, PopLimit() is called to undo the limit coded_input.PopLimit(msgLimit); //Print the message cout<<"Message is "<<payload.DebugString(); } void* SocketHandler(void* lp){ int *csock = (int*)lp; char buffer[4]; int bytecount=0; string output,pl; log_packet logp; memset(buffer, '\0', 4); while (1) { //Peek into the socket and get the packet size if((bytecount = recv(*csock, buffer, 4, MSG_PEEK))== -1){ fprintf(stderr, "Error receiving data %d\n", errno); }else if (bytecount == 0) break; cout<<"First read byte count is "<<bytecount<<endl; readBody(*csock,readHdr(buffer)); } FINISH: free(csock); return 0; }