Merge remote-tracking branch 'origin/master' into master-cmake

Conflicts:
	components/raop/raop.c
	components/raop/rtp.c
	main/cmd_squeezelite.c
This commit is contained in:
Sebastien
2020-03-08 09:54:50 -04:00
48 changed files with 951 additions and 481 deletions

View File

@@ -1,5 +1,5 @@
idf_component_register(SRCS dmap_parser.c raop_sink.c raop.c rtp.c util.c
idf_component_register(SRC_DIRS .
INCLUDE_DIRS .
REQUIRES newlib platform_config services codecs tools display

View File

@@ -44,7 +44,7 @@
#include "log_util.h"
#define RTSP_STACK_SIZE (8*1024)
#define SEARCH_STACK_SIZE (2*1048)
#define SEARCH_STACK_SIZE (3*1048)
typedef struct raop_ctx_s {
#ifdef WIN32
@@ -57,9 +57,9 @@ typedef struct raop_ctx_s {
struct in_addr peer; // IP of the iDevice (airplay sender)
bool running;
#ifdef WIN32
pthread_t thread, search_thread;
pthread_t thread;
#else
TaskHandle_t thread, search_thread, joiner;
TaskHandle_t thread, joiner;
StaticTask_t *xTaskBuffer;
StackType_t xStack[RTSP_STACK_SIZE] __attribute__ ((aligned (4)));
#endif
@@ -81,11 +81,12 @@ typedef struct raop_ctx_s {
char DACPid[32], id[32];
struct in_addr host;
u16_t port;
bool running;
#ifdef WIN32
struct mDNShandle_s *handle;
pthread_t thread;
#else
bool running;
TaskHandle_t thread, joiner;
TaskHandle_t thread;
StaticTask_t *xTaskBuffer;
StackType_t xStack[SEARCH_STACK_SIZE] __attribute__ ((aligned (4)));;
SemaphoreHandle_t destroy_mutex;
@@ -99,7 +100,7 @@ extern log_level raop_loglevel;
static log_level *loglevel = &raop_loglevel;
static void* rtsp_thread(void *arg);
static void abort_rtsp(raop_ctx_t *ctx);
static void cleanup_rtsp(raop_ctx_t *ctx, bool abort);
static bool handle_rtsp(raop_ctx_t *ctx, int sock);
static char* rsa_apply(unsigned char *input, int inlen, int *outlen, int mode);
@@ -218,7 +219,7 @@ void raop_delete(struct raop_ctx_s *ctx) {
}
/*----------------------------------------------------------------------------*/
void raop_delete(struct raop_ctx_s *ctx) {
void raop_delete(struct raop_ctx_s *ctx) {
#ifdef WIN32
int sock;
struct sockaddr addr;
@@ -240,25 +241,13 @@ void raop_delete(struct raop_ctx_s *ctx) {
rtp_end(ctx->rtp);
shutdown(ctx->sock, SD_BOTH);
shutdown(ctx->sock, SD_BOTH);
closesocket(ctx->sock);
// terminate search, but do not reclaim memory of pthread if never launched
if (ctx->active_remote.handle) {
close_mDNS(ctx->active_remote.handle);
pthread_join(ctx->active_remote.thread, NULL);
}
// stop broadcasting devices
mdns_service_remove(ctx->svr, ctx->svc);
mdnsd_stop(ctx->svr);
#else
// first stop the search task if any
if (ctx->active_remote.running) {
ctx->active_remote.joiner = xTaskGetCurrentTaskHandle();
ctx->active_remote.running = false;
vTaskResume(ctx->active_remote.thread);
}
// stop broadcasting devices
@@ -267,10 +256,11 @@ void raop_delete(struct raop_ctx_s *ctx) {
#else
// then the RTSP task
ctx->joiner = xTaskGetCurrentTaskHandle();
ctx->running = false;
ctx->running = false;
ulTaskNotifyTake(pdFALSE, portMAX_DELAY);
vTaskDelete(ctx->thread);
heap_caps_free(ctx->xTaskBuffer);
shutdown(ctx->sock, SHUT_RDWR);
closesocket(ctx->sock);
@@ -405,7 +395,7 @@ static void *rtsp_thread(void *arg) {
FD_SET(sock, &rfds);
n = select(sock + 1, &rfds, NULL, NULL, &timeout);
if (!n && !ctx->abort) continue;
if (n > 0) res = handle_rtsp(ctx, sock);
@@ -515,10 +505,10 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock)
NFREE(p);
}
// on announce, search remote
// on announce, search remote
if ((buf = kd_lookup(headers, "DACP-ID")) != NULL) strcpy(ctx->active_remote.DACPid, buf);
if ((buf = kd_lookup(headers, "Active-Remote")) != NULL) strcpy(ctx->active_remote.id, buf);
#ifdef WIN32
ctx->active_remote.handle = init_mDNS(false, ctx->host);
pthread_create(&ctx->active_remote.thread, NULL, &search_remote, ctx);
@@ -580,37 +570,14 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock)
unsigned short seqno = 0;
unsigned rtptime = 0;
char *p;
buf = kd_lookup(headers, "RTP-Info");
if ((p = strcasestr(buf, "seq")) != NULL) sscanf(p, "%*[^=]=%hu", &seqno);
if ((p = strcasestr(buf, "rtptime")) != NULL) sscanf(p, "%*[^=]=%u", &rtptime);
// only send FLUSH if useful (discards frames above buffer head and top)
if (ctx->rtp && rtp_flush(ctx->rtp, seqno, rtptime, true)) {
// only send FLUSH if useful (discards frames above buffer head and top)
if (ctx->rtp && rtp_flush(ctx->rtp, seqno, rtptime))
success = ctx->cmd_cb(RAOP_FLUSH);
} else if (!strcmp(method, "TEARDOWN")) {
rtp_end(ctx->rtp);
ctx->rtp = NULL;
// need to make sure no search is on-going and reclaim pthread memory
#ifdef WIN32
if (ctx->active_remote.handle) close_mDNS(ctx->active_remote.handle);
pthread_join(ctx->search_thread, NULL);
#else
ctx->active_remote.joiner = xTaskGetCurrentTaskHandle();
ctx->active_remote.running = false;
xSemaphoreTake(ctx->active_remote.destroy_mutex, portMAX_DELAY);
vTaskDelete(ctx->active_remote.thread);
vSemaphoreDelete(ctx->active_remote.thread);
heap_caps_free(ctx->active_remote.xTaskBuffer);
LOG_INFO("[%p]: mDNS search task terminated", ctx);
#endif
success = ctx->cmd_cb(RAOP_FLUSH);
rtp_flush_release(ctx->rtp);
}
@@ -631,18 +598,16 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock)
success = ctx->cmd_cb(RAOP_VOLUME, volume);
} else if (body && (p = strcasestr(body, "progress")) != NULL) {
int start, current, stop = 0;
// we want ms, not s
sscanf(p, "%*[^:]:%u/%u/%u", &start, &current, &stop);
current = ((current - start) / 44100) * 1000;
if (stop) stop = ((stop - start) / 44100) * 1000;
// we want ms, not s
sscanf(p, "%*[^:]:%u/%u/%u", &start, &current, &stop);
current = ((current - start) / 44100) * 1000;
if (stop) stop = ((stop - start) / 44100) * 1000;
else stop = -1;
LOG_INFO("[%p]: SET PARAMETER progress %d/%u %s", ctx, current, stop, p);
success = ctx->cmd_cb(RAOP_PROGRESS, max(current, 0), stop);
} else if (body && ((p = kd_lookup(headers, "Content-Type")) != NULL) && !strcasecmp(p, "application/x-dmap-tagged")) {
struct metadata_s metadata;
struct metadata_s metadata;
dmap_settings settings = {
NULL, NULL, NULL, NULL, NULL, NULL, NULL, on_dmap_string, NULL,
NULL
@@ -651,6 +616,10 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock)
LOG_INFO("[%p]: received metadata", ctx);
settings.ctx = &metadata;
memset(&metadata, 0, sizeof(struct metadata_s));
if (!dmap_parse(&settings, body, len)) {
LOG_INFO("[%p]: received metadata\n\tartist: %s\n\talbum: %s\n\ttitle: %s",
ctx, metadata.artist, metadata.album, metadata.title);
success = ctx->cmd_cb(RAOP_METADATA, metadata.artist, metadata.album, metadata.title);
free_metadata(&metadata);
}
} else {
@@ -678,26 +647,28 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock)
NFREE(body);
NFREE(buf);
kd_free(resp);
kd_free(headers);
kd_free(headers);
return true;
}
/*----------------------------------------------------------------------------*/
void abort_rtsp(raop_ctx_t *ctx) {
/*----------------------------------------------------------------------------*/
void cleanup_rtsp(raop_ctx_t *ctx, bool abort) {
// first stop RTP process
if (ctx->rtp) {
rtp_end(ctx->rtp);
ctx->rtp = NULL;
if (abort) LOG_INFO("[%p]: RTP thread aborted", ctx);
}
ctx->rtp = NULL;
if (ctx->active_remote.running) {
}
#ifdef WIN32
pthread_join(ctx->active_remote.thread, NULL);
close_mDNS(ctx->active_remote.handle);
#else
// need to make sure no search is on-going and reclaim task memory
ctx->active_remote.running = false;
xSemaphoreTake(ctx->active_remote.destroy_mutex, portMAX_DELAY);
xSemaphoreTake(ctx->active_remote.destroy_mutex, portMAX_DELAY);
vTaskDelete(ctx->active_remote.thread);
vSemaphoreDelete(ctx->active_remote.thread);
@@ -767,7 +738,7 @@ static void* search_remote(void *args) {
if (r) {
for (a = r->addr; a && a->addr.type != IPADDR_TYPE_V4; a = a->next);
if (a) {
found = true;
found = true;
ctx->active_remote.host.s_addr = a->addr.u_addr.ip4.addr;
ctx->active_remote.port = r->port;
LOG_INFO("found remote %s %s:%hu", r->instance_name, inet_ntoa(ctx->active_remote.host), ctx->active_remote.port);
@@ -947,9 +918,8 @@ static unsigned int token_decode(const char *token)
return DECODE_ERROR;
for (i = 0; i < 4; i++) {
val *= 64;
if (token[i] == '=')
marker++;
else if (marker > 0)
if (token[i] == '=')
marker++;
else if (marker > 0)
return DECODE_ERROR;
else

View File

@@ -38,6 +38,7 @@
#include <sys/stat.h>
#include <stdint.h>
#include <fcntl.h>
#include <assert.h>
#include "platform.h"
#include "rtp.h"
@@ -48,11 +49,10 @@
#ifdef WIN32
#include <openssl/aes.h>
#include "alac_wrapper.h"
#include <assert.h>
#define MSG_DONTWAIT 0
#else
#include "esp_pthread.h"
#include "esp_system.h"
#include "assert.h"
#include <mbedtls/version.h>
#include <mbedtls/aes.h>
#include "alac_wrapper.h"
@@ -268,25 +268,25 @@ rtp_resp_t rtp_init(struct in_addr host, int latency, char *aeskey, char *aesiv,
// create http port and start listening
resp.cport = ctx->rtp_sockets[CONTROL].lport;
resp.tport = ctx->rtp_sockets[TIMING].lport;
resp.aport = ctx->rtp_sockets[DATA].lport;
resp.tport = ctx->rtp_sockets[TIMING].lport;
resp.aport = ctx->rtp_sockets[DATA].lport;
ctx->running = true;
ctx->running = true;
#ifdef WIN32
pthread_create(&ctx->thread, NULL, rtp_thread_func, (void *) ctx);
#else
// xTaskCreate((TaskFunction_t) rtp_thread_func, "RTP_thread", RTP_TASK_SIZE, ctx, CONFIG_ESP32_PTHREAD_TASK_PRIO_DEFAULT + 1 , &ctx->thread);
ctx->xTaskBuffer = (StaticTask_t*) heap_caps_malloc(sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
pthread_create(&ctx->thread, NULL, rtp_thread_func, (void *) ctx);
#else
ctx->xTaskBuffer = (StaticTask_t*) heap_caps_malloc(sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
ctx->thread = xTaskCreateStatic( (TaskFunction_t) rtp_thread_func, "RTP_thread", RTP_STACK_SIZE, ctx,
CONFIG_ESP32_PTHREAD_TASK_PRIO_DEFAULT + 1, ctx->xStack, ctx->xTaskBuffer );
CONFIG_ESP32_PTHREAD_TASK_PRIO_DEFAULT + 1, ctx->xStack, ctx->xTaskBuffer );
#endif
// cleanup everything if we failed
if (!rc) {
LOG_ERROR("[%p]: cannot start RTP", ctx);
rtp_end(ctx);
ctx = NULL;
}
rtp_end(ctx);
ctx = NULL;
}
resp.ctx = ctx;
return resp;
@@ -328,7 +328,7 @@ void rtp_end(rtp_t *ctx)
fclose(ctx->rtpOUT);
#endif
}
/*---------------------------------------------------------------------------*/
bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime, bool exit_locked)
{
@@ -341,7 +341,7 @@ bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime)
} else {
pthread_mutex_lock(&ctx->ab_mutex);
buffer_reset(ctx->audio_buffer);
ctx->playing = false;
ctx->playing = false;
ctx->flush_seqno = seqno;
if (!exit_locked) pthread_mutex_unlock(&ctx->ab_mutex);
}
@@ -350,8 +350,13 @@ bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime)
return rc;
}
/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/
void rtp_flush_release(rtp_t *ctx) {
pthread_mutex_unlock(&ctx->ab_mutex);
}
/*---------------------------------------------------------------------------*/
void rtp_record(rtp_t *ctx, unsigned short seqno, unsigned rtptime) {
ctx->record.seqno = seqno;
@@ -444,25 +449,23 @@ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool fi
abuf = ctx->audio_buffer + BUFIDX(seqno);
ctx->ab_write = seqno;
LOG_SDEBUG("packet expected seqno:%hu rtptime:%u (W:%hu R:%hu)", seqno, rtptime, ctx->ab_write, ctx->ab_read);
} else if (seq_order(ctx->ab_write, seqno)) {
seq_t i;
u32_t now;
// newer than expected
if (ctx->latency && seq_order(ctx->latency / ctx->frame_size, seqno - ctx->ab_write - 1)) {
// only get rtp latency-1 frames back (last one is seqno)
LOG_WARN("[%p] too many missing frames %hu seq: %hu, (W:%hu R:%hu)", ctx, seqno - ctx->ab_write - 1, seqno, ctx->ab_write, ctx->ab_read);
ctx->ab_write = seqno - ctx->latency / ctx->frame_size;
}
if (ctx->latency && seq_order(ctx->latency / ctx->frame_size, seqno - ctx->ab_read)) {
// if ab_read is lagging more than http latency, advance it
LOG_WARN("[%p] on hold for too long %hu (W:%hu R:%hu)", ctx, seqno - ctx->ab_read + 1, ctx->ab_write, ctx->ab_read);
ctx->ab_read = seqno - ctx->latency / ctx->frame_size + 1;
}
if (rtp_request_resend(ctx, ctx->ab_write + 1, seqno-1)) {
seq_t i;
u32_t now = gettime_ms();
for (i = ctx->ab_write + 1; seq_order(i, seqno); i++) {
ctx->audio_buffer[BUFIDX(i)].rtptime = rtptime - (seqno-i)*ctx->frame_size;
ctx->ab_write = seqno - ctx->latency / ctx->frame_size;
}
// need to request re-send and adjust timing of gaps
rtp_request_resend(ctx, ctx->ab_write + 1, seqno-1);
for (now = gettime_ms(), i = ctx->ab_write + 1; seq_order(i, seqno); i++) {
ctx->audio_buffer[BUFIDX(i)].rtptime = rtptime - (seqno-i)*ctx->frame_size;
ctx->audio_buffer[BUFIDX(i)].last_resend = now;
}
LOG_DEBUG("[%p]: packet newer seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
@@ -520,14 +523,21 @@ static void buffer_push_packet(rtp_t *ctx) {
if (now > playtime) {
LOG_DEBUG("[%p]: discarded frame now:%u missed by:%d (W:%hu R:%hu)", ctx, now, now - playtime, ctx->ab_write, ctx->ab_read);
ctx->discarded++;
curframe->ready = 0;
} else if (playtime - now <= hold) {
if (curframe->ready) {
ctx->data_cb((const u8_t*) curframe->data, curframe->len, playtime);
curframe->ready = 0;
} else {
LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
ctx->data_cb(silence_frame, ctx->frame_size * 4, playtime);
ctx->silent_frames++;
}
} else if (curframe->ready) {
ctx->data_cb((const u8_t*) curframe->data, curframe->len, playtime);
curframe->ready = 0;
} else if (playtime - now <= hold) {
LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
ctx->data_cb(silence_frame, ctx->frame_size * 4, playtime);
ctx->data_cb((const u8_t*) curframe->data, curframe->len, playtime);
curframe->ready = 0;
} else {
break;
}
@@ -572,7 +582,7 @@ static void *rtp_thread_func(void *arg) {
ntp_sent = rtp_request_timing(ctx);
}
while (ctx->running) {
while (ctx->running) {
ssize_t plen;
char type;
socklen_t rtp_client_len = sizeof(struct sockaddr_in);
@@ -585,14 +595,18 @@ static void *rtp_thread_func(void *arg) {
if (select(sock + 1, &fds, NULL, NULL, &timeout) <= 0) continue;
for (i = 0; i < 3; i++)
for (i = 0; i < 3; i++)
if (FD_ISSET(ctx->rtp_sockets[i].sock, &fds)) idx = i;
plen = recvfrom(ctx->rtp_sockets[idx].sock, packet, MAX_PACKET, MSG_DONTWAIT, (struct sockaddr*) &ctx->rtp_host, &rtp_client_len);
if (!ntp_sent) {
LOG_WARN("[%p]: NTP request not send yet", ctx);
ntp_sent = rtp_request_timing(ctx);
ntp_sent = rtp_request_timing(ctx);
}
if (plen <= 0) {
LOG_WARN("Nothing received on a readable socket %d", plen);
continue;
}
@@ -638,7 +652,7 @@ static void *rtp_thread_func(void *arg) {
case 0x54: {
u32_t rtp_now_latency = ntohl(*(u32_t*)(pktp+4));
u64_t remote = (((u64_t) ntohl(*(u32_t*)(pktp+8))) << 32) + ntohl(*(u32_t*)(pktp+12));
u32_t rtp_now = ntohl(*(u32_t*)(pktp+16));
u32_t rtp_now = ntohl(*(u32_t*)(pktp+16));
u16_t flags = ntohs(*(u16_t*)(pktp+2));
u32_t remote_gap = NTP2MS(remote - ctx->timing.remote);
@@ -711,6 +725,11 @@ static void *rtp_thread_func(void *arg) {
LOG_DEBUG("[%p]: Timing references local:%llu, remote:%llx (delta:%lld, sum:%lld, adjust:%lld, gaps:%d)",
ctx, ctx->timing.local, ctx->timing.remote);
break;
}
default: {
LOG_WARN("Unknown packet received %x", (int) type);
break;
}
@@ -752,7 +771,7 @@ static bool rtp_request_timing(rtp_t *ctx) {
// no address from sender, need to wait for 1st packet to be received
if (host.sin_addr.s_addr == INADDR_ANY) return false;
host.sin_port = htons(ctx->rtp_sockets[TIMING].rport);
if (sizeof(req) != sendto(ctx->rtp_sockets[TIMING].sock, req, sizeof(req), MSG_DONTWAIT, (struct sockaddr*) &host, sizeof(host))) {
@@ -778,7 +797,7 @@ static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) {
*(u16_t*)(req+2) = htons(1); // our seqnum
*(u16_t*)(req+4) = htons(first); // missed seqnum
*(u16_t*)(req+6) = htons(last-first+1); // count
ctx->rtp_host.sin_port = htons(ctx->rtp_sockets[CONTROL].rport);
if (sizeof(req) != sendto(ctx->rtp_sockets[CONTROL].sock, req, sizeof(req), MSG_DONTWAIT, (struct sockaddr*) &ctx->rtp_host, sizeof(ctx->rtp_host))) {

View File

@@ -14,7 +14,8 @@ rtp_resp_t rtp_init(struct in_addr host, int latency,
short unsigned pCtrlPort, short unsigned pTimingPort,
raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb);
void rtp_end(struct rtp_s *ctx);
bool rtp_flush(struct rtp_s *ctx, unsigned short seqno, unsigned rtptime);
bool rtp_flush(struct rtp_s *ctx, unsigned short seqno, unsigned rtptime, bool exit_locked);
void rtp_flush_release(struct rtp_s *ctx);
void rtp_record(struct rtp_s *ctx, unsigned short seqno, unsigned rtptime);
void rtp_metadata(struct rtp_s *ctx, struct metadata_s *metadata);

View File

@@ -401,7 +401,7 @@ bool http_parse(int sock, char *method, key_data_t *rkd, char **body, int *len)
}
if (*len) {
int size = 0;
int size = 0;
*body = malloc(*len + 1);
while (*body && size < *len) {