InfoJobData Parameters `json:"info_job_data"`
InfoTypeIdentity string `json:"info_type_identity"`
sourceType sourceType
-}
+} // @name JobInfo
type JobTypesManager interface {
LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
type Parameters struct {
BufferTimeout BufferTimeout `json:"bufferTimeout"`
-}
+} // @name Parameters
type BufferTimeout struct {
MaxSize int `json:"maxSize"`
MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
-}
+} // @name BufferTimeout
func (j *job) start() {
if j.isJobBuffered() {
}
func getAsJSONArray(rawMsgs [][]byte) []byte {
- json := `"[`
+ if len(rawMsgs) == 0 {
+ return []byte("")
+ }
+ strings := ""
for i := 0; i < len(rawMsgs); i++ {
- msg := string(rawMsgs[i])
- json = json + strings.ReplaceAll(msg, "\"", "\\\"")
- if i < len(rawMsgs)-1 {
- json = json + ","
- }
+ strings = strings + makeIntoString(rawMsgs[i])
+ strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
+ }
+ return []byte(wrapInJSONArray(strings))
+}
+
+func makeIntoString(rawMsg []byte) string {
+ return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
+}
+
+func addSeparatorIfNeeded(strings string, position, length int) string {
+ if position < length-1 {
+ strings = strings + ","
}
- return []byte(json + `]"`)
+ return strings
+}
+
+func wrapInJSONArray(strings string) string {
+ return "[" + strings + "]"
}
func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {