+type pollingAgent interface {
+ pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+ if typeDef.DMaaPTopicURL != "" {
+ return dMaaPPollingAgent{
+ messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+ pollClient: pollClient,
+ }
+ } else {
+ return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+ }
+}
+
+type dMaaPPollingAgent struct {
+ messageRouterURL string
+ pollClient restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+ return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+ kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+ c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+ if err != nil {
+ log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+ }
+ return kafkaPollingAgent{
+ kafkaClient: c,
+ }
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+ msg, err := pa.kafkaClient.ReadMessage()
+ if err == nil {
+ return msg, nil
+ } else {
+ if isKafkaTimedOutError(err) {
+ return []byte(""), nil
+ }
+ return nil, err
+ }
+}
+
+func isKafkaTimedOutError(err error) bool {
+ kafkaErr, ok := err.(kafka.Error)
+ return ok && kafkaErr.Code() == kafka.ErrTimedOut
+}
+