Internet of Things Asked by Francisco Parrilla on August 23, 2021
I have a question regarding mqtt-paho and the possibility to create 10K clients using multiprocessing based process.
This is my code for now (Im using multi-processing based process to create threads):
import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
import sys
logging.basicConfig(level=logging.INFO)
init_time = time.time()
disconnected = 0
def Connect(client, broker, port, token, keepalive, run_forever=False):
connflag = False
delay = 5
print("connecting ",client)
badcount = 0 # counter for bad connection attempts
while not connflag:
print(logging.info("connecting to broker " + str(broker)))
# print("connecting to broker "+str(broker)+":"+str(port))
print("Attempts ", str(badcount))
time.sleep(2)
try:
client.username_pw_set(token)
client.connect(broker, port, keepalive)
time.sleep(1)
connflag = True
except:
pass
#client.badconnection_flag = True
#logging.info("connection failed " + str(badcount))
#time.sleep(5)
#badcount += 1
#if badcount >= 3 and not run_forever:
# return -1
# raise SystemExit # give up
return 0
def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
"""Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
# running loop is true when using loop_start or loop_forever
client.running_loop = running_loop #
wcount = 0
while True:
logging.info("waiting" + msgType)
if msgType == "CONNACK":
if client.on_connect:
if client.connected_flag:
return True
if client.bad_connection_flag: #
return False
if msgType == "SUBACK":
if client.on_subscribe:
if client.suback_flag:
return True
if msgType == "MESSAGE":
if client.on_message:
if client.message_received_flag:
return True
if msgType == "PUBACK":
if client.on_publish:
if client.puback_flag:
return True
if not client.running_loop:
client.loop(.01) # check for messages manually
time.sleep(period)
wcount += 1
if wcount > wait_time:
print("return from wait loop taken too long")
return False
return True
def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
loop_delay=10, run_forever=False):
"""runs a loop that will auto reconnect and subscribe to topics
pass topics as a list of tuples. You can pass a function to be
called at set intervals determined by the loop_delay
"""
client.run_flag = True
client.broker = broker
print("running loop ")
client.reconnect_delay_set(min_delay=1, max_delay=12)
while client.run_flag: # loop forever
if client.bad_connection_flag:
break
if not client.connected_flag:
print("Connecting to " + broker)
if Connect(client, broker, port, token, keepalive, run_forever) != -1:
if not wait_for(client, "CONNACK"):
client.run_flag = True # break no connack
else: # connect fails
client.run_flag = False # break
print("quitting loop for broker ", broker)
client.loop(0.01)
if client.connected_flag and loop_function: # function to call
loop_function(client, loop_delay) # call function
time.sleep(1)
print("disconnecting from", broker)
if client.connected_flag:
client.disconnect()
client.connected_flag = False
def on_log(client, userdata, level, buf):
print(buf)
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True # set flag
for c in clients:
#print("connected OK")
pass
else:
print("Bad connection Returned code=", rc)
file1 = open("bad_connections.txt","a")#append mode
file1.write("Bad connection Returned code=%s n" % rc)
file1.close()
client.loop_stop()
def on_disconnect(client, userdata, rc):
client.connected_flag = False # set flag
print("client disconnected ok")
def on_publish(client, userdata, mid):
print("In on_pub callback mid= ", mid)
def pub(client, loop_delay):
rmd_current = round(random.uniform(0.6, 50.0), 2)
rmd_pressure = round(random.uniform(0.6, 50.0), 2)
global init_time
if time.time() - init_time >= 3600:
rmd_mnc = round(random.uniform(5.0, 30.0), 2)
rmd_sdc = round(random.random(), 2)
rmd_mnp = round(random.uniform(5.0, 30.0), 2)
rmd_sdp = round(random.random(), 2)
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
client.publish('v1/devices/me/telemetry',
'{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
'"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))
init_time = time.time()
else:
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
print(datetime.datetime.now())
time.sleep(loop_delay)
def Create_connections(n_clients, threads):
for i in range(len(n_clients)):
cname = "client-" + n_clients[i]["name"]
t = int(time.time())
client_id = cname + str(t) # create unique client_id
client = mqtt.Client(client_id) # create new instance
n_clients[i]["client"] = client
n_clients[i]["client_id"] = client_id
n_clients[i]["cname"] = cname
broker_p = n_clients[i]["broker"]
port = n_clients[i]["port"]
token = n_clients[i]["token"]
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.on_message = on_message
t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
threads.append(t)
t.start()
def main_loop(clients_loop):
mqtt.Client.connected_flag = False # create flag in class
mqtt.Client.bad_connection_flag = False # create flag in class
threads = []
print("Creating Connections ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("Publishing ")
Create_connections(clients_loop, threads)
print("All clients connected ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("starting main loop")
try:
while True:
time.sleep(10)
no_threads = threading.active_count()
print("current threads =", no_threads)
for c in clients_loop:
if not c["client"].connected_flag:
print("broker ", c["broker"], " is disconnected" , c["name"])
file2 = open("disconnects.txt","a")#append mode
file2.write("broker %s is disconnected %s n" % (c["broker"], c["name"]))
file2.close()
time.sleep(1)
#sys.exit("A connection was dropped")
except KeyboardInterrupt:
print("ending")
for c in clients_loop:
c["client"].run_flag = False
time.sleep(10)
if __name__ == '__main__':
# In case the user is using a demo version or local version of thingsboard
things_location = input("What type of thingsboard installation are you working with (demo/local)? ")
if things_location == "demo":
type_install = "demo.thingsboard.io"
header = Things.get_credentials(things_location)
elif things_location == "local":
computer = input("Which computer? ")
type_install = "cseetprj%s.essex.ac.uk:8080" % computer
broker = "cseetprj%s.essex.ac.uk" % computer
header = Things.get_credentials("local", type_install)
else:
print("Error: Installation not supported")
my_devices = Things.get_devices_id(header, type_install)
clients = []
for device in my_devices:
device_info = {"broker": broker, "port": 1883, "name": device["name"],
"token": Things.get_device_token(device["id"]["id"], header, type_install)}
clients.append(device_info)
print(len(clients))
time.sleep(5)
if len(clients) >= 200:
print("Splitting devices to multiprocess")
split_by = math.ceil(len(clients) / 250)
split_clients = np.array_split(clients, split_by)
jobs = []
for idx, client_portion in enumerate(split_clients):
print("Starting process for portion %s" % (idx + 1))
p = multiprocessing.Process(target=main_loop, args = (client_portion,))
jobs.append(p)
p.start()
for job in jobs:
print("Ending process")
job.join()
When testing only with 1K devices, for some reason I get some rc=3 (this number varies and I dont know why), and another clients end up disconnecting (also dont know why). Is there something wrong with the code? Im planning to try to send data for up to 10K devices but I cant even get to establish 1K constant connections.
The specs of the machine where I have the IoT platform (that receives the connections) are the following: Proceessor: Intel Core i5-3570 CPU @3.40GHz x 4
RAM: 8GB
Disk Size 500GB
Thanks in advance
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP