diff --git a/asyncgelf/asyncgelf.py b/asyncgelf/asyncgelf.py index 7935579..2b40033 100644 --- a/asyncgelf/asyncgelf.py +++ b/asyncgelf/asyncgelf.py @@ -129,6 +129,7 @@ async def tcp_handler(self, message, timestamp: Optional = None): :return: Exception """ messages = [] + timestamps = [] """ Input type checking """ @@ -137,6 +138,12 @@ async def tcp_handler(self, message, timestamp: Optional = None): else: messages = message + + if type(timestamp) is list: + timestamps = timestamp + + + try: if self.tls: @@ -157,7 +164,9 @@ async def tcp_handler(self, message, timestamp: Optional = None): return getattr(e, 'message', repr(e)) - for message in messages: + for i,message in enumerate(messages): + if len(timestamps) > 0: + timestamp = timestamps[i] gelf_message = GelfBase.make(self, message, timestamp) """ Transforming GELF dictionary into bytes """ bytes_msg = json.dumps(gelf_message).encode('utf-8') @@ -176,7 +185,7 @@ class GelfHttp(GelfBase): async def http_handler(self, message, timestamp: Optional = None): """ http handler for send logs to Graylog Input with type: gelf http - :param message: input message + :param message: message to send, can be list, str, dict :param timestamp: event timestamp in the format: seconds since UNIX epoch with optional decimal places for milliseconds :return: http status code @@ -185,10 +194,39 @@ async def http_handler(self, message, timestamp: Optional = None): 'Content-Type': 'application/json', } - if self.compress: - header.update({'Content-Encoding': 'gzip,deflate'}) + messages = [] + timestamps = [] + bulk_messages = "" + """ + Input type checking + """ + if type(message) is not list: + messages.append(message) - gelf_message = GelfBase.make(self, message, timestamp) + else: + messages = message + + if type(timestamp) is list: + timestamps = timestamp + + for i,message in enumerate(messages): + if len(timestamps) > 0: + timestamp = timestamps[i] + gelf_message = GelfBase.make(self, message, timestamp) + # In case of multiple messages send them in bulk + if len(messages) > 1: + # Convert the gelf_message dictionary to a JSON string and append it to bulk_message + """ + Warning: Individual GELF messages must be formatted as a valid JSON (containing no line breaks within). Attempts to post formatted JSON to this input will result in an error. + """ + bulk_messages+=json.dumps(gelf_message).replace('\n', ' ').replace('\r', '') + "\r\n" + + if len(messages) > 1: + # Remove the last "\r\n" from bulk_message + gelf_message = bulk_messages.rstrip("\r\n") + else: + # Dumping the gelf_message dictionary to a JSON string + gelf_message = json.dumps(gelf_message) if self.tls: ssl_contex = ssl.create_default_context() @@ -201,7 +239,7 @@ async def http_handler(self, message, timestamp: Optional = None): response = await client.post( gelf_endpoint, headers=header, - data=json.dumps(gelf_message), + data=gelf_message, ) return response.status_code @@ -219,7 +257,7 @@ async def http_handler(self, message, timestamp: Optional = None): response = await client.post( gelf_endpoint, headers=header, - data=json.dumps(gelf_message), + data=gelf_message, ) return response.status_code