TransWikia.com

How can I resubmit data to a SocketConnect object without creating a new socket?

Mathematica Asked on May 17, 2021

I have a Python function that can be accessed over a TCP connection on port 1234. In Wolfram Language, I have the following code:

ClearAll[getResFromPython];
getResFromPython[sock_][arg_String] := (
    WriteString[sock, ExportString[<|"arg" -> arg|>, "JSON"]];
    ReadString[sock]
);

Since the creation of a new socket is not exactly instantaneous–it takes nearly two seconds on my computer–I would like to do the following:

ClearAll[sock, vals];
sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}];
Close[sock];
(* Now do something with vals. *)

This does not work. Only "arg1" is evaluated correctly, and I think it is because the socket needs to be flushed before the next argument is written to it. But I cannot find any function in the documentation that would flush the socket. There is a WSFlush function, but it is for an entirely different purpose.

What is the correct way to reuse a socket connection over a list of arguments?

Edit

In case anyone is interested, here is my simple Python setup:

# tcpserver.py
import socketserver
import urllib.parse
import json

def getVal(arg):
    return 1

class TCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        self.data = json.loads(self.request.recv(2048).strip())
        res = getVal(self.data["arg"])
        self.request.sendall(bytes(json.dumps({"res": res}), "utf-8"))

if __name__ == "__main__":
    HOST, PORT = "localhost", 1234
    with socketserver.TCPServer((HOST, PORT), SASTCPHandler) as server:
        server.serve_forever()

One Answer

I have been grappling with this for a while and finally I realized the problem is not Mathematica. TCPHandler appears to kill the connection after the first handle(). It does not keep-alive a connection. You could either read on and use the code at the end of my answer, or you could try connecting once and writing once in batch. This means you write all the data together for one connection by using the sequence form of WriteString[sock, string1, string2, ..., stringn].


If you use this basic server implementation below you'll see that Mathematica is quite capable of multiple WriteString's on the same socket and a single connection:

import socket
from time import sleep

TCP_IP = '127.0.0.1'
TCP_PORT = 1234
BUFFER_SIZE = 2048

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)

connection, addr = s.accept()
while True:
    try:
        connection.send("keeping alive".encode())
        data = connection.recv(BUFFER_SIZE)
        if data:
            print("received data:" + str(data))
            connection.send("got it!".encode())
        else:
            sleep(0.06)
    except KeyboardInterrupt:
        connection.close()
    except socket.error:
        print("client disconnected")
        connection.close()
        break

... and from Mathematica:

sock = SocketConnect[{"localhost", 1234}, "TCP"];
WriteString[sock, "Hey!"]
Print@ByteArrayToString@SocketReadMessage[sock]
WriteString[sock, "What's up?"]
Print@ByteArrayToString@SocketReadMessage[sock]
Close[sock]

You should see the result server-side:

received data:b'Hey!'
received data:b"What's up?"
client disconnected

So we know it's possible. Let's look at a Wireshark packet dump for your TCPServer to see what's wrong. I monitored the loopback interface instead of Ethernet or Wi-Fi to produce the trace.

It looks like the server is the one terminating the connection. The server sends a FIN packet just before Mathematica is about to send the second item. Mathematica wasn't expecting it and it continues to send the second item (the final [PSH, ACK] in the trace). Mathematica doesn't end the connection and comply with the FIN so the server sends a RST and kills the connection hard, never handling the remaining requests. After TCPHandler.handle() returns from processing the first item, the TCPServer doesn't keep the connection going.

wireshark tcp packets

If you want to keep the connection alive during the handle() you'll have to create a loop and a poll delay like this:

# tcpserver.py
import socketserver
import urllib.parse
import json
from time import sleep

def getVal(arg):
    return 1

class TCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        alive = True
        while(alive):
            try:
                self.data = self.request.recv(2048)
                if self.data:
                    jsonresult = json.loads(self.data.strip())
                    res = getVal(jsonresult["arg"])
                    print(jsonresult)
                    self.request.sendall(bytes(json.dumps({"res": res}), "utf-8"))
                else:
                    print("no data")
                    time.sleep(0.06) # 60ms delay
                    continue
            except:
                print("finished!")
                alive = False


if __name__ == "__main__":
    HOST, PORT = "localhost", 1234
    with socketserver.TCPServer((HOST, PORT), TCPHandler) as server:
        server.socket.settimeout(None)
        server.serve_forever()

... and use the following slight modification to your Mathematica code (I've used SocketReadMessage instead of ReadString):

ClearAll[getResFromPython];
getResFromPython[sock_][arg_String] := (
   WriteString[sock, ExportString[<|"arg" -> arg|>, "JSON"]];
   ByteArrayToString@SocketReadMessage[sock]);

ClearAll[sock, vals];
sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}];
Close[sock];

Finally it works!

{'arg': 'arg1'}
{'arg': 'arg2'}
{'arg': 'arg3'}
no data
finished!

It also works with sending multiple requests with delay in between which you can do like this:

sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Reap[Do[
     Sow[Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}]];
     Pause[1];
     , 5]] // Last // First
Close[sock];

Correct answer by flinty on May 17, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP