Closed
Description
Just upgraded to 1.2.0 and the KafkaConsumer stopped working immediately, throwing a ValueError.
Even when using the most simplistic KafkaConsumer the error is thrown at the first record consumed:
kafka_consumer = KafkaConsumer(
topic,
group_id=kafka_group_id,
bootstrap_servers=kafka_servers
)
for message in kafka_consumer:
print(message)
Error:
e/import/opsc.py", line 7, in <module>\n for message in kafka_consumer:
venv1/lib64/python3.4/site-packages/kafka/consumer/group.py", line 850, in __next__\n return next(self._iterator)
venv1/lib64/python3.4/site-packages/kafka/consumer/group.py", line 790, in _message_generator\n self._client.poll(timeout_ms=poll_ms, sleep=True)
venv1/lib64/python3.4/site-packages/kafka/client_async.py", line 436, in poll\n responses.extend(self._poll(timeout, sleep=sleep))
venv1/lib64/python3.4/site-packages/kafka/client_async.py", line 479, in _poll\n response = conn.recv() # Note: conn.recv runs callbacks / errbacks
venv1/lib64/python3.4/site-packages/kafka/conn.py", line 477, in recv\n response = self._process_response(self._rbuffer) venv1/lib64/python3.4/site-packages/kafka/conn.py", line 510, in _process_response\n response = ifr.response_type.decode(read_buffer)
venv1/lib64/python3.4/site-packages/kafka/protocol/struct.py", line 39, in decode\n return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/struct.py", line 39, in <listcomp>\n return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 155, in decode\n return [self.array_of.decode(data) for _ in range(length)]
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 155, in <listcomp>\n return [self.array_of.decode(data) for _ in range(length)] '
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 118, in decode\n return tuple([field.decode(data) for field in self.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 118, in <listcomp>\n return tuple([field.decode(data) for field in self.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 155, in decode\n return [self.array_of.decode(data) for _ in range(length)]
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 155, in <listcomp>\n return [self.array_of.decode(data) for _ in range(length)]
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 118, in decode\n return tuple([field.decode(data) for field in self.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 118, in <listcomp>\n return tuple([field.decode(data) for field in self.fields])
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 50, in decode\n return _unpack(\'>i\', data.read(4))
venv1/lib64/python3.4/site-packages/kafka/protocol/types.py", line 20, in _unpack\n raise ValueError(error)
It has worked in 1.1.1
Metadata
Metadata
Assignees
Labels
No labels