1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 #include <ipcencoding.h>
19 #include <callbackregistries.h>
20 #include <clearinghouseregistries.h>
21 #include <lappregistries.h>
28 gs_uint64_t shared_memory_full_warning =0;
30 static gs_int32_t maxsnaplen = 0;
32 struct FTAID clearinghouseftaid;
34 gs_uint64_t intupledrop=0;
35 gs_uint64_t outtupledrop=0;
36 gs_uint64_t intuple=0;
37 gs_uint64_t outtuple=0;
38 gs_uint64_t inbytes=0;
39 gs_uint64_t outbytes=0;
44 /* following function is internal and defined in lappinterface.c */
45 gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);
47 gs_retval_t gscp_blocking_mode() {
48 #ifdef BLOCKRINGBUFFER
55 static void clock_signal_check() {
57 static gs_int32_t t=0;
62 if (ftaexec_start()<0) {
63 gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
67 while ((fa=ftaexec_next())!=0) {
68 if (fa->clock_fta!=0) {
73 if (t%GSLOGINTERVAL==0) gsstats();// log all the stats
78 static gs_retval_t send_standard_reply(FTAID f, gs_int32_t result) {
79 struct standard_result r;
80 r.h.callid=STANDARD_RESULT;
81 r.h.size=sizeof(struct standard_result);
83 if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
84 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
90 static gs_retval_t send_lookup_reply(FTAID f, gs_int32_t result,
93 struct ftafind_result r;
94 r.h.callid=FTAFIND_RESULT;
95 r.h.size=sizeof(struct ftafind_result);
99 if (strlen(*schema)>=(MAXSCHEMASZ-1)) {
100 gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);
103 strcpy(r.schema,*schema);
106 if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
107 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
114 static gs_retval_t send_fta_result(FTAID f,
115 FTAID * ftaid, gs_int32_t result) {
117 r.h.callid=FTA_RESULT;
118 r.h.size=sizeof(struct fta_result);
123 if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {
124 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
131 // Is also used by the lfta rts enviroment on a post. So make it none
133 gs_retval_t send_wakeup(FTAID f)
135 struct wakeup_result a;
138 a.h.size=sizeof(struct wakeup_result);
139 if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {
140 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
146 static gs_retval_t fta_register_instance(FTAID subscriber,
147 FTAID f,gs_uint32_t reusable,
150 gs_int8_t rb[MAXRES];
151 struct fta_register_arg a;
152 struct standard_result * sr = (struct standard_result *)rb;
154 if (curprocess.type != CLEARINGHOUSE) {
155 a.h.callid = FTA_REGISTER;
156 a.h.size = sizeof(struct fta_register_arg);
157 if (strlen(name)>=(MAXFTANAME-1)) {
158 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
161 if (strlen(schema)>=(MAXSCHEMASZ-1)) {
162 gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
166 strcpy(a.schema,schema);
168 a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
170 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
171 if (sr->h.callid != STANDARD_RESULT) {
172 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
175 if (sr->result != 0) {
179 if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {
186 static gs_retval_t fta_unregister_instance(FTAID subscriber,
188 gs_int8_t rb[MAXRES];
189 struct fta_unregister_arg a;
190 struct standard_result * sr = (struct standard_result *)rb;
192 if (curprocess.type != CLEARINGHOUSE) {
193 a.h.callid = FTA_UNREGISTER;
194 a.h.size = sizeof(struct fta_register_arg);
196 a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
197 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
198 if (sr->h.callid != STANDARD_RESULT) {
199 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
204 if (ftalookup_unregister_fta(f,subscriber)<0) {
211 gs_retval_t fta_start_service(gs_int32_t number)
213 gs_int8_t buf[MAXMSGSZ];
216 struct hostcall * h= (struct hostcall *) buf;
217 gs_int32_t forever=0;
218 gs_int32_t endtime=0;
240 endtime=time(0)+number;
244 (endtime==0)||(endtime>time(0))) {
245 /* check if we need to give the FTAs there clock signal */
246 if ((curprocess.type == LFTA)
247 ||(curprocess.type == HFTA)) {
248 clock_signal_check();
255 /* first empty out sidequeu then read from messagequeue */
256 if (sidequeue_pop(&from,buf,&length)<0) {
257 /* empty out the sidequeue before processing the shared
259 if (curprocess.type == HFTA) {
260 /* process all the shared memory regions and register
264 streamregistry_getactiveringbuf_reset();
265 while ((r=streamregistry_getactiveringbuf())>0) {
268 if (ftaexec_start()<0) {
269 gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
273 while ((fa=ftaexec_next())!=0) {
275 for(x=0;x<fa->stream_subscribed_cnt;x++) {
276 if ((fa->stream_subscribed[x].streamid
277 ==CURREAD(r)->f.streamid)
278 && (fa->stream_subscribed[x].ip
280 && (fa->stream_subscribed[x].port
281 ==CURREAD(r)->f.port)) {
282 fa->accept_packet(fa,&(CURREAD(r)->f),
283 &(CURREAD(r)->data[0]),
289 inbytes+=CURREAD(r)->sz;
291 if (endq <= time(0)) {
300 /* register wakeups all arround to make sure we don't sleep
303 streamregistry_getactiveftaid_reset();
304 while ((ftaidp=streamregistry_getactiveftaid())>0) {
305 struct gscp_get_buffer_arg a;
306 a.h.callid = GSCP_GET_BUFFER;
307 a.h.size = sizeof(struct gscp_get_buffer_arg);
309 if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
317 /* even if we block we return every 100msec to be able to generate the clock signal to
320 if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {
321 gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");
324 /* check if we need to give the FTAs there clock signal */
325 if ((curprocess.type == LFTA)
326 ||(curprocess.type == HFTA)) {
327 clock_signal_check();
329 if ((res==0) && (block==0)) {
330 /* nonblocking and nothing to do so return */
333 if ((res==0) && (endtime!=0) && (endtime<time(0))) {
334 /* timeout reached so return */
338 if ((res==0)&&(curprocess.type == HFTA)) {
343 if ((lopp)!=FTACALLBACK) {
344 gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");
350 gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,
356 if (curprocess.type == CLEARINGHOUSE) {
357 struct fta_find_arg * n;
360 n = (struct fta_find_arg *) buf;
361 /* Note: Side effect of ftalookup_lookup_fta_index is to
362 fill msgid and index */
363 if (send_lookup_reply(from,
364 ftalookup_lookup_fta_index(from,
370 (gs_sp_t *)&schema)<0) {
371 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
375 if (send_standard_reply(from,-1)<0) {
376 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
379 gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
380 "contacted for clearinghouse processing\n");
385 if (curprocess.type == CLEARINGHOUSE) {
386 struct fta_register_arg * n;
387 n = (struct fta_register_arg *) buf;
388 if (send_standard_reply(from,
389 ftalookup_register_fta(n->subscriber,
394 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
398 if (send_standard_reply(from,-1)<0) {
399 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
402 gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
403 "contacted for clearinghouse processing\n");
407 case FTA_UNREGISTER: {
408 if (curprocess.type == CLEARINGHOUSE) {
409 struct fta_unregister_arg * n;
410 n = (struct fta_unregister_arg *) buf;
411 if (send_standard_reply(from,
412 ftalookup_unregister_fta(n->subscriber,
414 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
418 if (send_standard_reply(from,-1)<0) {
419 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
422 gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
423 "contacted for clearinghouse processing\n");
427 case FTA_ALLOC_INSTANCE:
428 case FTA_ALLOC_PRINT_INSTANCE:
429 if ((curprocess.type == LFTA)
430 ||(curprocess.type == HFTA)) {
432 struct fta_alloc_instance_arg * n;
433 n = (struct fta_alloc_instance_arg *) buf;
434 if ((fta=ftaexec_alloc_instance(n->f.index,
435 (struct FTA *)n->f.streamid,
439 &(n->data[0])))==0) {
440 gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"
442 if (send_fta_result(from,0,-1)<0) {
443 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
448 /* shared memory is only required if data is beeing transfered */
449 if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&
450 ((r=gscpipc_getshm(from))==0)) {
451 gslog(LOG_EMERG,"GSCPRTS::warning::could not get"
454 if (send_fta_result(from,0,-1)<0) {
455 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
460 /* no callback to register for print function */
461 if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {
462 gslog(LOG_EMERG,"GSCPRTS::warning::could not add"
463 "streamid to ringbuffer\n");
464 ftaexec_free_instance(fta,1);
465 if (send_fta_result(from,0,-1)<0) {
466 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
471 if (ftaexec_insert(0,fta)<0) {
472 gslog(LOG_EMERG,"GSCPRTS::warning::could not"
474 ftacallback_rm_streamid(r,fta->ftaid.streamid);
475 ftaexec_free_instance(fta,1);
476 if (send_fta_result(from,0,-1)<0) {
477 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
482 if (fta_register_instance(n->subscriber,fta->ftaid,
486 gslog(LOG_EMERG,"GSCPRTS::warning::could not register"
489 ftacallback_rm_streamid(r,fta->ftaid.streamid);
490 ftaexec_free_instance(fta,1);
491 if (send_fta_result(from,0,-1)<0) {
492 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
497 if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {
498 if (curprocess.type == LFTA) {
499 gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "
500 "implemented for LFTA.\n");
502 ftaexec_free_instance(fta,1);
503 if (send_fta_result(from,0,-1)<0) {
504 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
509 if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,
510 n->temporal_field, n->split_field, n->delta, n->split) < 0) {
512 ftaexec_free_instance(fta,1);
513 if (send_fta_result(from,0,-1)<0) {
514 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
521 if (send_fta_result(from,&fta->ftaid,0)<0) {
522 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
532 if (send_fta_result(from,0,-1)<0) {
533 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
539 case FTA_FREE_INSTANCE:{
540 if ((curprocess.type == LFTA)
541 ||(curprocess.type == HFTA)) {
542 struct fta_free_instance_arg * n;
543 n = (struct fta_free_instance_arg *) buf;
544 if (((r=gscpipc_getshm(from))!=0)
545 && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)
546 && ( ftacallback_rm_streamid(r,n->f.streamid)==0)
547 && (ftaexec_free_instance((struct FTA *)n->f.streamid,
550 && (fta_unregister_instance(n->subscriber,n->f)<0)) {
551 if (send_standard_reply(from,0)<0) {
552 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
556 if (send_standard_reply(from,-1)<0) {
557 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
562 if (send_standard_reply(from,-1)<0) {
563 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
570 if ((curprocess.type == LFTA)
571 ||(curprocess.type == HFTA)) {
572 struct fta_control_arg * n;
573 n = (struct fta_control_arg *) buf;
574 if (send_standard_reply(from,
575 ftaexec_control((struct FTA *)
580 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
584 if (send_standard_reply(from,-1)<0) {
585 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
588 gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
589 "contacted for clearinghouse or HFTA processing\n");
593 case FTA_PRODUCER_FAILURE: {
594 if (curprocess.type == CLEARINGHOUSE) {
595 struct fta_notify_producer_failure_arg * n;
596 n = (struct fta_notify_producer_failure_arg *) buf;
597 ftalookup_producer_failure(n->sender,n->producer);
601 case FTA_HEARTBEAT: {
602 if (curprocess.type == CLEARINGHOUSE) {
603 struct fta_heartbeat_arg * n;
604 n = (struct fta_heartbeat_arg *) buf;
605 ftalookup_heartbeat(n->sender,n->trace_id,
606 n->sz,&(n->data[0]));
610 case GSCP_GET_BUFFER:{
611 struct sgroup_get_buffer_arg * n;
614 n = (struct sgroup_get_buffer_arg *) buf;
616 if ((r=gscpipc_getshm(from))==0) {
617 gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"
621 /* something arrived in the meantime so wakeup
623 if (send_wakeup(from)<0) {
624 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
629 if (ftacallback_add_wakeup(from,r)<0) {
630 gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");
634 gslog(LOG_EMERG,"Received wakeup request on polling systems\n");
640 case PROCESS_CONTROL:{
641 if ((curprocess.type == LFTA)
642 ||(curprocess.type == HFTA)) {
643 struct process_control_arg * n;
644 n = (struct process_control_arg *) buf;
645 if (send_standard_reply(from,
646 ftaexec_process_control(n->command,
649 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
653 if (send_standard_reply(from,-1)<0) {
654 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
657 gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
658 "contacted for clearinghouse or HFTA processing\n");
666 gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);
669 /* use this occation to cleanup the messagequeue we can't afford
670 a backlog in the real message queue since it is limited in
672 while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {
674 gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,
677 if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
678 if (sidequeue_append(from,buf,length)<0) {
679 gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
689 static gs_retval_t map_match(gs_csp_t dev) {
691 for(x=0;x<curprocess.mapcnt;x++){
692 if (strcmp(dev,curprocess.map[x])==0) {
699 gs_retval_t fta_max_snaplen()
704 FTAID fta_register(FTAname name,gs_uint32_t reusable, DEVname dev,
705 alloc_fta fta_alloc_functionptr,
706 gs_csp_t schema, gs_int32_t snaplen, gs_uint64_t prefilter)
708 gs_int8_t rb[MAXRES];
709 struct fta_register_arg a;
710 struct standard_result * sr = (struct standard_result *)rb;
720 /* check if the device matches for the registration */
721 if (((dev==0) && (curprocess.deviceid==0))
722 || ((dev!=0)&&(map_match(dev)==1))) {
724 gslog(LOG_INFO,"Register %s on device %s\n",name,dev);
726 gslog(LOG_INFO,"Register %s on default device\n",name);
728 if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {
729 gslog(LOG_EMERG,"ERROR could not register callback\n");
736 if (maxsnaplen!=-1) {
737 maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;
741 res=gscpipc_getftaid();
743 if (curprocess.type != CLEARINGHOUSE) {
744 a.h.callid = FTA_REGISTER;
745 a.h.size = sizeof(struct fta_register_arg);
746 if (strlen(name)>=(MAXFTANAME-1)) {
747 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
750 if (strlen(schema)>=(MAXSCHEMASZ-1)) {
751 gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
755 strcpy(a.schema,schema);
757 a.subscriber=res; /* consumer is the same as f for an FTA*/
759 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
760 if (sr->h.callid != STANDARD_RESULT) {
761 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
764 if (sr->result != 0 ) {
765 gslog(LOG_EMERG,"ERROR:Error in registration\n");
769 if (ftalookup_register_fta(gscpipc_getftaid(),
770 res,name,reusable,schema)!=0) {
778 gs_retval_t fta_unregister(FTAID ftaid)
780 gs_int8_t rb[MAXRES];
781 struct fta_unregister_arg a;
782 struct standard_result * sr = (struct standard_result *)rb;
783 if (ftacallback_rm_alloc(ftaid.index)<0) {
784 gslog(LOG_EMERG,"ERROR could not unregister callback\n");
788 if (curprocess.type != CLEARINGHOUSE) {
789 a.h.callid = FTA_UNREGISTER;
790 a.h.size = sizeof(struct fta_register_arg);
792 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);
793 if (sr->h.callid != STANDARD_RESULT) {
794 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
799 return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);
804 gs_retval_t hfta_post_tuple(struct FTA * self, gs_int32_t sz, void *tuple)
808 struct wakeup_result a;
813 gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);
817 if (self->printfunc.in_use==1) {
818 return print_stream(self,sz,tuple);
821 if (ftacallback_start_streamid((gs_p_t )self)<0) {
822 gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
825 /* now make sure we have space to write in all atomic ringbuffer */
826 while((r=ftacallback_next_streamid(&state))!=0) {
827 if (state == HFTA_RINGBUF_ATOMIC) {
828 #ifdef BLOCKRINGBUFFER
829 while (!SPACETOWRITE(r)) {
833 if (! SPACETOWRITE(r)) {
834 /* atomic ring buffer and no space so post nothing */
841 if (ftacallback_start_streamid((gs_p_t )self)<0) {
842 gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
846 while((r=ftacallback_next_streamid(&state))!=0) {
847 if (state != HFTA_RINGBUF_SUSPEND) {
848 if (!SPACETOWRITE(r)) {
849 //since memory is full we set a warning
850 shared_memory_full_warning++;
851 // give receiver a chance to clean up
854 if (SPACETOWRITE(r)) {
855 CURWRITE(r)->f=self->ftaid;
857 memcpy(&(CURWRITE(r)->data[0]),tuple,sz);
859 outbytes=outbytes+CURWRITE(r)->sz;
862 gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"
863 "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);
864 gslog(LOG_EMERG,"\t%u %u\n",CURREAD(r)->next,
870 if (HOWFULL(r) > 500) {
871 // buffer is at least half full
872 shared_memory_full_warning++;
874 gslog(LOG_EMERG,"\t\t buffer full\n");
880 if (ftacallback_start_wakeup((gs_p_t ) self)<0) {
881 gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");
885 a.h.size=sizeof(struct wakeup_result);
886 while((f=ftacallback_next_wakeup())!=0) {
887 if (send_wakeup(*f)<0) {
888 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
896 gs_retval_t hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t * space, gs_int32_t szr, gs_int32_t tuplesz)
902 if (f->printfunc.in_use==1) {
904 gslog(LOG_INFO,"Checking space for printfunc");
908 if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
909 gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");
913 while ((ru=ftacallback_next_streamid(&state))!=0) {
916 space[x]=TUPLEFIT(ru,tuplesz);
924 gs_retval_t hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
927 if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {
928 gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");