You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
#
|
|
# Copyright (c) Contributors to the Open 3D Engine Project. For complete copyright and license terms please see the LICENSE at the root of this distribution.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0 OR MIT
|
|
#
|
|
#
|
|
|
|
import argparse
|
|
import logging
|
|
import json
|
|
import socket
|
|
from datetime import datetime
|
|
|
|
SOCKET_TIMEOUT = 60
|
|
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
|
FILEBEAT_PIPELINE = "filebeat"
|
|
FILEBEAT_DEFAULT_IP = "127.0.0.1"
|
|
FILEBEAT_DEFAULT_PORT = 9000
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
prog="submit_metrics.py",
|
|
description="Pushes a JSON document via Filebeat.",
|
|
add_help=False
|
|
)
|
|
|
|
def file_arg(arg):
|
|
try:
|
|
with open(arg) as json_file:
|
|
return json.load(json_file)
|
|
except ValueError:
|
|
raise argparse.ArgumentTypeError("Invalid json file '%s'" % arg)
|
|
|
|
parser.add_argument("-f", "--file", default=None, type=file_arg, help="File containing JSON data to upload.")
|
|
parser.add_argument("-i", "--index", default=None, help="Index to use when sending the data")
|
|
parser.add_argument("-ip", "--filebeat_ip", default=FILEBEAT_DEFAULT_IP, help="IP address where filebeat service is listening")
|
|
parser.add_argument("-port", "--filebeat_port", default=FILEBEAT_DEFAULT_PORT, help="Port where filebeat service is listening")
|
|
return parser.parse_args()
|
|
|
|
def submit(index, payload, filebeat_ip = FILEBEAT_DEFAULT_IP, filebeat_port = FILEBEAT_DEFAULT_PORT):
|
|
try:
|
|
filebeat_address = filebeat_ip, filebeat_port
|
|
|
|
logging.debug(f"Connecting to Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
|
|
fb_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
fb_socket.settimeout(SOCKET_TIMEOUT)
|
|
fb_socket.connect(filebeat_address)
|
|
|
|
event = {
|
|
"index": index,
|
|
"timestamp": datetime.strptime(payload['timestamp'], DATE_FORMAT).strftime(DATE_FORMAT),
|
|
"pipeline": FILEBEAT_PIPELINE,
|
|
"payload": json.dumps(payload),
|
|
}
|
|
|
|
# Serialise event, add new line and encode as UTF-8 before sending to Filebeat.
|
|
data = json.dumps(event) + "\n"
|
|
data = data.encode()
|
|
|
|
total_sent = 0
|
|
|
|
logging.debug(f"Sending JSON data")
|
|
while total_sent < len(data):
|
|
try:
|
|
sent = fb_socket.send(data[total_sent:])
|
|
except BrokenPipeError:
|
|
print("An exception occurred while sending data")
|
|
fb_socket.close()
|
|
total_sent = 0
|
|
else:
|
|
total_sent = total_sent + sent
|
|
logging.debug("JSON data sent")
|
|
fb_socket.close()
|
|
logging.debug(f"Disconnected from Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
|
|
except (ConnectionError, socket.timeout):
|
|
logging.error("Failed to connect to Filebeat")
|
|
return False
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
# Parse CLI arguments.
|
|
args = parse_args()
|
|
if not args.index:
|
|
logging.error(f"Index not specified")
|
|
exit(1)
|
|
|
|
if not submit(args.index, json.dumps(args.file), args.filebeat_ip, args.filebeat_port):
|
|
exit(1)
|