MythTV  master
iptvstreamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // System headers
4 #ifdef _WIN32
5 # include <ws2tcpip.h>
6 #else
7 # include <sys/types.h>
8 # include <sys/socket.h>
9 # include <netinet/in.h>
10 # include <netinet/ip.h>
11 #endif
12 
13 // Qt headers
14 #include <QUdpSocket>
15 #include <QByteArray>
16 #include <QHostInfo>
17 
18 // MythTV headers
19 #include "iptvstreamhandler.h"
20 #include "rtppacketbuffer.h"
21 #include "udppacketbuffer.h"
22 #include "rtptsdatapacket.h"
23 #include "rtpdatapacket.h"
24 #include "rtpfecpacket.h"
25 #include "rtcpdatapacket.h"
26 #include "mythlogging.h"
27 #include "cetonrtsp.h"
28 
29 #define LOC QString("IPTVSH[%1](%2): ").arg(_inputid).arg(_device)
30 
31 QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::s_iptvhandlers;
34 
36  int inputid)
37 {
38  QMutexLocker locker(&s_iptvhandlers_lock);
39 
40  QString devkey = tuning.GetDeviceKey();
41 
42  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devkey);
43 
44  if (it == s_iptvhandlers.end())
45  {
46  IPTVStreamHandler *newhandler = new IPTVStreamHandler(tuning, inputid);
47  newhandler->Start();
48  s_iptvhandlers[devkey] = newhandler;
49  s_iptvhandlers_refcnt[devkey] = 1;
50 
51  LOG(VB_RECORD, LOG_INFO,
52  QString("IPTVSH[%1]: Creating new stream handler %2 for %3")
53  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()));
54  }
55  else
56  {
57  s_iptvhandlers_refcnt[devkey]++;
58  uint rcount = s_iptvhandlers_refcnt[devkey];
59  LOG(VB_RECORD, LOG_INFO,
60  QString("IPTVSH[%1]: Using existing stream handler %2 for %3")
61  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()) +
62  QString(" (%1 in use)").arg(rcount));
63  }
64 
65  return s_iptvhandlers[devkey];
66 }
67 
68 void IPTVStreamHandler::Return(IPTVStreamHandler * & ref, int inputid)
69 {
70  QMutexLocker locker(&s_iptvhandlers_lock);
71 
72  QString devname = ref->_device;
73 
74  QMap<QString,uint>::iterator rit = s_iptvhandlers_refcnt.find(devname);
75  if (rit == s_iptvhandlers_refcnt.end())
76  return;
77 
78  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Return(%2) has %3 handlers")
79  .arg(inputid).arg(devname).arg(*rit));
80 
81  if (*rit > 1)
82  {
83  ref = nullptr;
84  (*rit)--;
85  return;
86  }
87 
88  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devname);
89  if ((it != s_iptvhandlers.end()) && (*it == ref))
90  {
91  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Closing handler for %2")
92  .arg(inputid).arg(devname));
93  ref->Stop();
94  delete *it;
95  s_iptvhandlers.erase(it);
96  }
97  else
98  {
99  LOG(VB_GENERAL, LOG_ERR,
100  QString("IPTVSH[%1] Error: Couldn't find handler for %2")
101  .arg(inputid).arg(devname));
102  }
103 
104  s_iptvhandlers_refcnt.erase(rit);
105  ref = nullptr;
106 }
107 
109  : StreamHandler(tuning.GetDeviceKey(), inputid)
110  , m_tuning(tuning)
111  , m_write_helper(nullptr)
112  , m_buffer(nullptr)
113  , m_rtsp_rtp_port(0)
114  , m_rtsp_rtcp_port(0)
115  , m_rtsp_ssrc(0)
116 {
117  memset(m_sockets, 0, sizeof(m_sockets));
118  memset(m_read_helpers, 0, sizeof(m_read_helpers));
120 }
121 
123 {
124  RunProlog();
125 
126  LOG(VB_GENERAL, LOG_INFO, LOC + "run()");
127 
128  SetRunning(true, false, false);
129 
130  // TODO Error handling..
131 
132  // Setup
133  CetonRTSP *rtsp = nullptr;
134  IPTVTuningData tuning = m_tuning;
135  if(m_tuning.IsRTSP())
136  {
137  rtsp = new CetonRTSP(m_tuning.GetURL(0));
138 
139  // Check RTSP capabilities
140  QStringList options;
141  if (!(rtsp->GetOptions(options) && options.contains("DESCRIBE") &&
142  options.contains("SETUP") && options.contains("PLAY") &&
143  options.contains("TEARDOWN")))
144  {
145  LOG(VB_RECORD, LOG_ERR, LOC +
146  "RTSP interface did not support the necessary options");
147  delete rtsp;
148  SetRunning(false, false, false);
149  RunEpilog();
150  return;
151  }
152 
153  if (!rtsp->Describe())
154  {
155  LOG(VB_RECORD, LOG_ERR, LOC +
156  "RTSP Describe command failed");
157  delete rtsp;
158  SetRunning(false, false, false);
159  RunEpilog();
160  return;
161  }
162 
163  m_use_rtp_streaming = true;
164 
165  QUrl urltuned = m_tuning.GetURL(0);
166  urltuned.setScheme("rtp");
167  urltuned.setPort(0);
168  tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
169  urltuned.toString(), 0, "", 0);
170  }
171 
172  bool error = false;
173 
174  int start_port = 0;
175  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
176  {
177  QUrl url = tuning.GetURL(i);
178  if (url.port() < 0)
179  continue;
180 
181  LOG(VB_RECORD, LOG_DEBUG, LOC +
182  QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
183 
184  // always ensure we use consecutive port numbers
185  int port = start_port ? start_port + 1 : url.port();
186  QString host = url.host();
187  QHostAddress dest_addr(host);
188 
189  if (!host.isEmpty() && dest_addr.isNull())
190  {
191  // address is a hostname, attempts to resolve it
192  QHostInfo info = QHostInfo::fromName(host);
193  QList<QHostAddress> list = info.addresses();
194 
195  if (list.isEmpty())
196  {
197  LOG(VB_RECORD, LOG_ERR, LOC +
198  QString("Can't resolve hostname:'%1'").arg(host));
199  }
200  else
201  {
202  for (int j=0; j < list.size(); j++)
203  {
204  dest_addr = list[j];
205  if (list[j].protocol() == QAbstractSocket::IPv6Protocol)
206  {
207  // We prefer first IPv4
208  break;
209  }
210  }
211  LOG(VB_RECORD, LOG_DEBUG, LOC +
212  QString("resolved %1 as %2").arg(host).arg(dest_addr.toString()));
213  }
214  }
215  bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
216  bool is_multicast = ipv6 ?
217  dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
218  (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
219 
220  m_sockets[i] = new QUdpSocket();
221  if (!is_multicast)
222  {
223  // this allow to filter incoming traffic, and make sure it's from
224  // the requested server
225  m_sender[i] = dest_addr;
226  }
228  this, m_sockets[i], i);
229 
230  // we need to open the descriptor ourselves so we
231  // can set some socket options
232  int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
233  if (fd < 0)
234  {
235  LOG(VB_GENERAL, LOG_ERR, LOC +
236  "Unable to create socket " + ENO);
237  continue;
238  }
239  int buf_size = 2 * 1024 * max(tuning.GetBitrate(i)/1000, 500U);
240  if (!tuning.GetBitrate(i))
241  buf_size = 2 * 1024 * 1024;
242  int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
243  (char *)&buf_size, sizeof(buf_size));
244  if (err)
245  {
246  LOG(VB_GENERAL, LOG_INFO, LOC +
247  QString("Increasing buffer size to %1 failed")
248  .arg(buf_size) + ENO);
249  }
250 
251  m_sockets[i]->setSocketDescriptor(
252  fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
253 
254  // we bind to destination address if it's a multicast address, or
255  // the local ones otherwise
256  if (!m_sockets[i]->bind(is_multicast ?
257  dest_addr :
258  (ipv6 ? QHostAddress::AnyIPv6 : QHostAddress::Any),
259  port))
260  {
261  LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
262  error = true;
263  }
264  else
265  {
266  start_port = m_sockets[i]->localPort();
267  }
268 
269  if (is_multicast)
270  {
271  m_sockets[i]->joinMulticastGroup(dest_addr);
272  LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
273  .arg(dest_addr.toString()));
274  }
275 
276  if (!is_multicast && rtsp && i == 1)
277  {
278  m_rtcp_dest = dest_addr;
279  }
280  }
281 
282  if (!error)
283  {
284  if (m_tuning.IsRTP() || m_tuning.IsRTSP())
285  m_buffer = new RTPPacketBuffer(tuning.GetBitrate(0));
286  else
287  m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
291  }
292 
293  if (!error && rtsp)
294  {
295  // Start Streaming
296  if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
298  !rtsp->Play())
299  {
300  LOG(VB_RECORD, LOG_ERR, LOC +
301  "Starting recording (RTP initialization failed). Aborting.");
302  error = true;
303  }
304  if (m_rtsp_rtcp_port > 0)
305  {
308  }
309  }
310 
311  if (!error)
312  {
313  // Enter event loop
314  exec();
315  }
316 
317  // Clean up
318  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
319  {
320  if (m_sockets[i])
321  {
322  delete m_sockets[i];
323  m_sockets[i] = nullptr;
324  delete m_read_helpers[i];
325  m_read_helpers[i] = nullptr;
326  }
327  }
328  delete m_buffer;
329  m_buffer = nullptr;
330  delete m_write_helper;
331  m_write_helper = nullptr;
332 
333  if (rtsp)
334  {
335  rtsp->Teardown();
336  delete rtsp;
337  }
338 
339  SetRunning(false, false, false);
340  RunEpilog();
341 }
342 
344  IPTVStreamHandler *p, QUdpSocket *s, uint stream) :
345  m_parent(p), m_socket(s), m_sender(p->m_sender[stream]),
346  m_stream(stream)
347 {
348  connect(m_socket, SIGNAL(readyRead()),
349  this, SLOT(ReadPending()));
350 }
351 
352 #define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->_device)
353 
355 {
356  QHostAddress sender;
357  quint16 senderPort;
358  bool sender_null = m_sender.isNull();
359 
360  if (0 == m_stream)
361  {
362  while (m_socket->hasPendingDatagrams())
363  {
365  QByteArray &data = packet.GetDataReference();
366  data.resize(m_socket->pendingDatagramSize());
367  m_socket->readDatagram(data.data(), data.size(),
368  &sender, &senderPort);
369  if (sender_null || sender == m_sender)
370  {
371  m_parent->m_buffer->PushDataPacket(packet);
372  }
373  else
374  {
375  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
376  QString("Received on socket(%1) %2 bytes from non expected "
377  "sender:%3 (expected:%4) ignoring")
378  .arg(m_stream).arg(data.size())
379  .arg(sender.toString()).arg(m_sender.toString()));
380  }
381  }
382  }
383  else
384  {
385  while (m_socket->hasPendingDatagrams())
386  {
388  QByteArray &data = packet.GetDataReference();
389  data.resize(m_socket->pendingDatagramSize());
390  m_socket->readDatagram(data.data(), data.size(),
391  &sender, &senderPort);
392  if (sender_null || sender == m_sender)
393  {
394  m_parent->m_buffer->PushFECPacket(packet, m_stream - 1);
395  }
396  else
397  {
398  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
399  QString("Received on socket(%1) %2 bytes from non expected "
400  "sender:%3 (expected:%4) ignoring")
401  .arg(m_stream).arg(data.size())
402  .arg(sender.toString()).arg(m_sender.toString()));
403  }
404  }
405  }
406 }
407 
409  : m_parent(p), m_timer(0), m_timer_rtcp(0),
410  m_last_sequence_number(0), m_last_timestamp(0), m_previous_last_sequence_number(0),
411  m_lost(0), m_lost_interval(0)
412 {
413 }
414 
416 {
417  if (m_timer)
418  {
419  killTimer(m_timer);
420  }
421  if (m_timer_rtcp)
422  {
423  killTimer(m_timer_rtcp);
424  }
425  m_timer = 0;
426  m_timer_rtcp = 0;
427  m_parent = nullptr;
428 }
429 
431 {
432  if (event->timerId() == m_timer_rtcp)
433  {
434  SendRTCPReport();
435  return;
436  }
437 
439  return;
440 
441  while (!m_parent->m_use_rtp_streaming)
442  {
444 
445  if (packet.GetDataReference().isEmpty())
446  break;
447 
448  int remainder = 0;
449  {
450  QMutexLocker locker(&m_parent->_listener_lock);
451  QByteArray &data = packet.GetDataReference();
452  IPTVStreamHandler::StreamDataList::const_iterator sit;
453  sit = m_parent->_stream_data_list.begin();
454  for (; sit != m_parent->_stream_data_list.end(); ++sit)
455  {
456  remainder = sit.key()->ProcessData(
457  reinterpret_cast<const unsigned char*>(data.data()),
458  data.size());
459  }
460  }
461 
462  if (remainder != 0)
463  {
464  LOG(VB_RECORD, LOG_INFO, LOC_WH +
465  QString("data_length = %1 remainder = %2")
466  .arg(packet.GetDataReference().size()).arg(remainder));
467  }
468 
469  m_parent->m_buffer->FreePacket(packet);
470  }
471 
473  {
475 
476  if (!packet.IsValid())
477  break;
478 
479  if (packet.GetPayloadType() == RTPDataPacket::kPayLoadTypeTS)
480  {
481  RTPTSDataPacket ts_packet(packet);
482 
483  if (!ts_packet.IsValid())
484  {
485  m_parent->m_buffer->FreePacket(packet);
486  continue;
487  }
488 
489  uint exp_seq_num = m_last_sequence_number + 1;
490  uint seq_num = ts_packet.GetSequenceNumber();
492  ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
493  {
494  LOG(VB_RECORD, LOG_INFO, LOC_WH +
495  QString("Sequence number mismatch %1!=%2")
496  .arg(seq_num).arg(exp_seq_num));
497  if (seq_num > exp_seq_num)
498  {
499  m_lost_interval = seq_num - exp_seq_num;
501  }
502  }
503  m_last_sequence_number = seq_num;
504  m_last_timestamp = ts_packet.GetTimeStamp();
505  LOG(VB_RECORD, LOG_DEBUG,
506  QString("Processing RTP packet(seq:%1 ts:%2)")
508 
509  m_parent->_listener_lock.lock();
510 
511  int remainder = 0;
512  IPTVStreamHandler::StreamDataList::const_iterator sit;
513  sit = m_parent->_stream_data_list.begin();
514  for (; sit != m_parent->_stream_data_list.end(); ++sit)
515  {
516  remainder = sit.key()->ProcessData(
517  ts_packet.GetTSData(), ts_packet.GetTSDataSize());
518  }
519 
520  m_parent->_listener_lock.unlock();
521 
522  if (remainder != 0)
523  {
524  LOG(VB_RECORD, LOG_INFO, LOC_WH +
525  QString("data_length = %1 remainder = %2")
526  .arg(ts_packet.GetTSDataSize()).arg(remainder));
527  }
528  }
529  m_parent->m_buffer->FreePacket(packet);
530  }
531 }
532 
534 {
535  if (m_parent->m_rtcp_dest.isNull())
536  {
537  // no point sending data if we don't know where to
538  return;
539  }
541  RTCPDataPacket rtcp =
545  QByteArray buf = rtcp.GetData();
546 
547  LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
548  QString("Sending RTCPReport to %1:%2")
549  .arg(m_parent->m_rtcp_dest.toString())
550  .arg(m_parent->m_rtsp_rtcp_port));
551  m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
554 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:216
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
uint GetTimeStamp(void) const
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
QUrl GetURL(uint i) const
IPTVTuningData m_tuning
QByteArray GetData(void) const
UDP Packet.
Definition: udppacket.h:20
QUdpSocket * m_sockets[IPTV_SOCKET_COUNT]
RTP Transport Stream Data Packet.
#define LOC_WH
static void error(const char *str,...)
Definition: vbi.c:41
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
uint GetSequenceNumber(void) const
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
Definition: cetonrtsp.cpp:407
QByteArray & GetDataReference(void)
Definition: udppacket.h:41
unsigned int uint
Definition: compat.h:140
virtual void PushDataPacket(const UDPPacket &)=0
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
IPTVStreamHandler * m_parent
friend class IPTVStreamHandlerWriteHelper
IPTVStreamHandlerReadHelper * m_read_helpers[IPTV_SOCKET_COUNT]
bool Describe(void)
Definition: cetonrtsp.cpp:336
RTP Data Packet.
Definition: rtpdatapacket.h:30
bool Play(void)
Definition: cetonrtsp.cpp:462
#define IPTV_SOCKET_COUNT
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
bool IsRTSP(void) const
RTCP Data Packet.
void Stop(void)
bool GetOptions(QStringList &options)
Definition: cetonrtsp.cpp:254
void Start(void)
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerReadHelper
uint GetBitrate(uint i) const
void timerEvent(QTimerEvent *) override
QMutex _listener_lock
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
IPTVStreamHandler * m_parent
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
#define LOC
QString GetDeviceName(void) const
bool Teardown(void)
Definition: cetonrtsp.cpp:470
QString GetDeviceKey(void) const
StreamDataList _stream_data_list
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
PacketBuffer * m_buffer
QHostAddress m_rtcp_dest
void FreePacket(const UDPPacket &)
Frees an RTPDataPacket returned by PopDataPacket.
const unsigned char * GetTSData(void) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:203
IPTVStreamHandlerWriteHelper * m_write_helper
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
QHostAddress m_sender[IPTV_SOCKET_COUNT]
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
unsigned int GetTSDataSize(void) const
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
Definition: rtpdatapacket.h:45
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
Definition: mthread.cpp:329
static QMutex s_iptvhandlers_lock
bool IsRTP(void) const
static void Return(IPTVStreamHandler *&ref, int inputid)
#define RTCP_TIMER
IPTVStreamHandlerWriteHelper(IPTVStreamHandler *)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.