Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions asyncgelf/asyncgelf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def tcp_handler(self, message, timestamp: Optional = None):
:return: Exception
"""
messages = []
timestamps = []
"""
Input type checking
"""
Expand All @@ -137,6 +138,12 @@ async def tcp_handler(self, message, timestamp: Optional = None):

else:
messages = message

if type(timestamp) is list:
timestamps = timestamp




try:
if self.tls:
Expand All @@ -157,7 +164,9 @@ async def tcp_handler(self, message, timestamp: Optional = None):

return getattr(e, 'message', repr(e))

for message in messages:
for i,message in enumerate(messages):
if len(timestamps) > 0:
timestamp = timestamps[i]
gelf_message = GelfBase.make(self, message, timestamp)
""" Transforming GELF dictionary into bytes """
bytes_msg = json.dumps(gelf_message).encode('utf-8')
Expand All @@ -176,7 +185,7 @@ class GelfHttp(GelfBase):
async def http_handler(self, message, timestamp: Optional = None):
"""
http handler for send logs to Graylog Input with type: gelf http
:param message: input message
:param message: message to send, can be list, str, dict
:param timestamp: event timestamp in the format: seconds since UNIX epoch with optional decimal places for
milliseconds
:return: http status code
Expand All @@ -185,10 +194,39 @@ async def http_handler(self, message, timestamp: Optional = None):
'Content-Type': 'application/json',
}

if self.compress:
header.update({'Content-Encoding': 'gzip,deflate'})
messages = []
timestamps = []
bulk_messages = ""
"""
Input type checking
"""
if type(message) is not list:
messages.append(message)

gelf_message = GelfBase.make(self, message, timestamp)
else:
messages = message

if type(timestamp) is list:
timestamps = timestamp

for i,message in enumerate(messages):
if len(timestamps) > 0:
timestamp = timestamps[i]
gelf_message = GelfBase.make(self, message, timestamp)
# In case of multiple messages send them in bulk
if len(messages) > 1:
# Convert the gelf_message dictionary to a JSON string and append it to bulk_message
"""
Warning: Individual GELF messages must be formatted as a valid JSON (containing no line breaks within). Attempts to post formatted JSON to this input will result in an error.
"""
bulk_messages+=json.dumps(gelf_message).replace('\n', ' ').replace('\r', '') + "\r\n"

if len(messages) > 1:
# Remove the last "\r\n" from bulk_message
gelf_message = bulk_messages.rstrip("\r\n")
else:
# Dumping the gelf_message dictionary to a JSON string
gelf_message = json.dumps(gelf_message)

if self.tls:
ssl_contex = ssl.create_default_context()
Expand All @@ -201,7 +239,7 @@ async def http_handler(self, message, timestamp: Optional = None):
response = await client.post(
gelf_endpoint,
headers=header,
data=json.dumps(gelf_message),
data=gelf_message,
)

return response.status_code
Expand All @@ -219,7 +257,7 @@ async def http_handler(self, message, timestamp: Optional = None):
response = await client.post(
gelf_endpoint,
headers=header,
data=json.dumps(gelf_message),
data=gelf_message,
)

return response.status_code
Expand Down