[spark streaming] Communication client serveur
Bonjour à tous,
Je suis débutant en spark streaming et je suis en train de faire quelques tests. J'envoi côté serveur des tweets récupérés via l'API Twitter et de l'autre côté, le client effectue un traitement dessus. Le problème est que je ne reçois pas instantanément les données côté client pour effectuer ce traitement. Avec un netcat (nc -lk localhost 5555) cela fonctionne mais avec ce que j'ai là aucun traitement ne se fait..
Serveur:
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| class TweetsListener(StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
msg = json.loads( data )
print( msg['text'].encode('utf-8') )
self.client_socket.send( msg['text'].encode('utf-8') )
print ("c'est bon")
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
def sendData(c_socket):
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener(c_socket))
twitter_stream.userstream(_with="user")
if __name__ == "__main__":
s = socket.socket() # Create a socket object
host = "localhost" # Get local machine name
port = 5555 # Reserve a port for your service.
s.bind((host, port)) # Bind to the port
print("Listening on port: %s" % str(port))
s.listen(5) # Now wait for client connection.
c, addr = s.accept() # Establish connection with client.
print( "Received request from: " + str( addr ) )
sendData(c) |
Client :
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| sc = SparkContext(appName="Test")
ssc = StreamingContext(sc, 10)
socket_stream = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
#Creation window with the data in RDD created -- RDD data every 20 sec
lines = socket_stream.window(20)
print(lines)
counts = (lines.flatMap(lambda line: line.split(" "))\
.filter(lambda word: word.lower().startswith("#")) \
.map(lambda word: (word.lower(),1)) \
.reduceByKey(lambda a, b: a+b))
counts.pprint()
ssc.start()
ssc.awaitTermination() |
Comment puis-je régler ce problème?
Merci à vous.