catching up (trying to) wiht CSpot

This commit is contained in:
Philippe G
2022-01-06 18:46:57 -08:00
parent 491d0d260d
commit 9af4cd5b23
57 changed files with 2165 additions and 343 deletions

View File

@@ -10,16 +10,17 @@ AudioChunk::AudioChunk(uint16_t seqId, std::vector<uint8_t> &audioKey, uint32_t
this->startPosition = startPosition;
this->endPosition = predictedEndPosition;
this->decryptedData = std::vector<uint8_t>();
this->isHeaderFileSizeLoadedSemaphore = std::make_unique<WrappedSemaphore>(2);
this->isLoadedSemaphore = std::make_unique<WrappedSemaphore>(2);
this->isHeaderFileSizeLoadedSemaphore = std::make_unique<WrappedSemaphore>(5);
this->isLoadedSemaphore = std::make_unique<WrappedSemaphore>(5);
}
AudioChunk::~AudioChunk()
{
}
void AudioChunk::appendData(std::vector<uint8_t> &data)
void AudioChunk::appendData(const std::vector<uint8_t> &data)
{
//if (this == nullptr) return;
this->decryptedData.insert(this->decryptedData.end(), data.begin(), data.end());
}

View File

@@ -3,7 +3,7 @@
#include "Logger.h"
AudioChunkManager::AudioChunkManager()
: bell::Task("AudioChunkManager", 4 * 1024, +1, 0) {
: bell::Task("AudioChunkManager", 4 * 1024, 2, 0) {
this->chunks = std::vector<std::shared_ptr<AudioChunk>>();
startTask();
}
@@ -12,6 +12,7 @@ std::shared_ptr<AudioChunk>
AudioChunkManager::registerNewChunk(uint16_t seqId,
std::vector<uint8_t> &audioKey,
uint32_t startPos, uint32_t endPos) {
std::scoped_lock lock(chunkMutex);
auto chunk =
std::make_shared<AudioChunk>(seqId, audioKey, startPos * 4, endPos * 4);
this->chunks.push_back(chunk);
@@ -26,6 +27,7 @@ void AudioChunkManager::handleChunkData(std::vector<uint8_t> &data,
}
void AudioChunkManager::failAllChunks() {
std::scoped_lock lock(chunkMutex);
// Enumerate all the chunks and mark em all failed
for (auto const &chunk : this->chunks) {
if (!chunk->isLoaded) {
@@ -47,9 +49,10 @@ void AudioChunkManager::close() {
void AudioChunkManager::runTask() {
std::scoped_lock lock(this->runningMutex);
this->isRunning = true;
std::pair<std::vector<uint8_t>, bool> audioPair;
while (isRunning) {
std::pair<std::vector<uint8_t>, bool> audioPair;
if (this->audioChunkDataQueue.wtpop(audioPair, 100)) {
std::scoped_lock lock(this->chunkMutex);
auto data = audioPair.first;
auto failed = audioPair.second;
uint16_t seqId = ntohs(extract<uint16_t>(data, 0));
@@ -57,7 +60,7 @@ void AudioChunkManager::runTask() {
// Erase all chunks that are not referenced elsewhere anymore
chunks.erase(
std::remove_if(chunks.begin(), chunks.end(),
[](const std::shared_ptr<AudioChunk> &chunk) {
[](std::shared_ptr<AudioChunk>& chunk) {
return chunk.use_count() == 1;
}),
chunks.end());
@@ -67,7 +70,7 @@ void AudioChunkManager::runTask() {
// Found the right chunk
if (chunk != nullptr && chunk->seqId == seqId) {
if (failed) {
// chunk->isFailed = true;
chunk->isFailed = true;
chunk->startPosition = 0;
chunk->endPosition = 0;
chunk->isHeaderFileSizeLoadedSemaphore->give();
@@ -96,9 +99,6 @@ void AudioChunkManager::runTask() {
break;
default:
if (chunk.get() == nullptr) {
return;
}
auto actualData = std::vector<uint8_t>(
data.begin() + 2, data.end());
chunk->appendData(actualData);
@@ -109,8 +109,6 @@ void AudioChunkManager::runTask() {
} catch (...) {
}
} else {
usleep(100);
}
}

View File

@@ -4,9 +4,11 @@
static size_t vorbisReadCb(void *ptr, size_t size, size_t nmemb, ChunkedAudioStream *self)
{
auto data = self->read(nmemb);
std::copy(data.begin(), data.end(), (char *)ptr);
return data.size();
size_t readSize = 0;
while (readSize < nmemb * size && self->byteStream->position() < self->byteStream->size()) {
readSize += self->byteStream->read((uint8_t *) ptr + readSize, (size * nmemb) - readSize);
}
return readSize;
}
static int vorbisCloseCb(ChunkedAudioStream *self)
{
@@ -29,7 +31,7 @@ static int vorbisSeekCb(ChunkedAudioStream *self, int64_t offset, int whence)
static long vorbisTellCb(ChunkedAudioStream *self)
{
return static_cast<long>(self->pos);
return static_cast<long>(self->byteStream->position());
}
ChunkedAudioStream::~ChunkedAudioStream()
@@ -38,22 +40,22 @@ ChunkedAudioStream::~ChunkedAudioStream()
ChunkedAudioStream::ChunkedAudioStream(std::vector<uint8_t> fileId, std::vector<uint8_t> audioKey, uint32_t duration, std::shared_ptr<MercuryManager> manager, uint32_t startPositionMs, bool isPaused)
{
this->audioKey = audioKey;
this->duration = duration;
this->manager = manager;
this->fileId = fileId;
this->startPositionMs = startPositionMs;
this->isPaused = isPaused;
auto beginChunk = manager->fetchAudioChunk(fileId, audioKey, 0, 0x4000);
beginChunk->keepInMemory = true;
while(beginChunk->isHeaderFileSizeLoadedSemaphore->twait() != 0);
this->fileSize = beginChunk->headerFileSize;
chunks.push_back(beginChunk);
// File size is required for this packet to be downloaded
this->fetchTraillingPacket();
// auto beginChunk = manager->fetchAudioChunk(fileId, audioKey, 0, 0x4000);
// beginChunk->keepInMemory = true;
// while(beginChunk->isHeaderFileSizeLoadedSemaphore->twait() != 0);
// this->fileSize = beginChunk->headerFileSize;
// chunks.push_back(beginChunk);
//
// // File size is required for this packet to be downloaded
// this->fetchTraillingPacket();
this->byteStream = std::make_shared<ChunkedByteStream>(manager);
this->byteStream->setFileInfo(fileId, audioKey);
this->byteStream->fetchFileInformation();
vorbisFile = { };
vorbisCallbacks =
{
@@ -66,12 +68,11 @@ ChunkedAudioStream::ChunkedAudioStream(std::vector<uint8_t> fileId, std::vector<
void ChunkedAudioStream::seekMs(uint32_t positionMs)
{
byteStream->setEnableLoadAhead(false);
this->seekMutex.lock();
loadingMeta = true;
ov_time_seek(&vorbisFile, positionMs);
loadingMeta = false;
this->seekMutex.unlock();
byteStream->setEnableLoadAhead(true);
CSPOT_LOG(debug, "--- Finished seeking!");
}
@@ -79,33 +80,31 @@ void ChunkedAudioStream::seekMs(uint32_t positionMs)
void ChunkedAudioStream::startPlaybackLoop()
{
loadingMeta = true;
isRunning = true;
byteStream->setEnableLoadAhead(false);
int32_t r = ov_open_callbacks(this, &vorbisFile, NULL, 0, vorbisCallbacks);
CSPOT_LOG(debug, "--- Loaded file");
if (this->startPositionMs != 0)
{
ov_time_seek(&vorbisFile, startPositionMs);
}
else
{
this->requestChunk(0);
ov_time_seek(&vorbisFile, startPositionMs);
}
loadingMeta = false;
bool eof = false;
std::vector<uint8_t> pcmOut(4096 / 4);
byteStream->setEnableLoadAhead(true);
while (!eof && isRunning)
{
if (!isPaused)
{
std::vector<uint8_t> pcmOut(4096 / 4);
this->seekMutex.lock();
long ret = ov_read(&vorbisFile, (char *)&pcmOut[0], 4096 / 4, &currentSection);
this->seekMutex.unlock();
if (ret == 0)
{
CSPOT_LOG(info, "EOL");
// and done :)
eof = true;
}
@@ -139,193 +138,37 @@ void ChunkedAudioStream::startPlaybackLoop()
this->streamFinishedCallback();
}
}
void ChunkedAudioStream::fetchTraillingPacket()
{
auto startPosition = (this->fileSize / 4) - 0x1000;
// AES block size is 16, so the index must be divisible by it
while ((startPosition * 4) % 16 != 0)
startPosition++; // ik, ugly lol
auto endChunk = manager->fetchAudioChunk(fileId, audioKey, startPosition, fileSize / 4);
endChunk->keepInMemory = true;
chunks.push_back(endChunk);
while (endChunk->isLoadedSemaphore->twait() != 0);
}
std::vector<uint8_t> ChunkedAudioStream::read(size_t bytes)
{
auto toRead = bytes;
auto res = std::vector<uint8_t>();
READ:
while (res.size() < bytes)
{
auto position = pos;
auto isLoadingMeta = loadingMeta;
// Erase all chunks not close to current position
chunks.erase(std::remove_if(
chunks.begin(), chunks.end(),
[position, &isLoadingMeta](const std::shared_ptr<AudioChunk> &chunk) {
if (isLoadingMeta) {
return false;
}
if (chunk->keepInMemory)
{
return false;
}
if (chunk->isFailed)
{
return true;
}
if (chunk->endPosition < position || chunk->startPosition > position + BUFFER_SIZE)
{
return true;
}
return false;
}),
chunks.end());
int16_t chunkIndex = this->pos / AUDIO_CHUNK_SIZE;
int32_t offset = this->pos % AUDIO_CHUNK_SIZE;
if (pos >= fileSize)
{
CSPOT_LOG(debug, "EOL!");
return res;
}
auto chunk = findChunkForPosition(pos);
if (chunk != nullptr)
{
auto offset = pos - chunk->startPosition;
if (chunk->isLoaded)
{
if (chunk->decryptedData.size() - offset >= toRead)
{
if((chunk->decryptedData.begin() + offset) < chunk->decryptedData.end()) {
res.insert(res.end(), chunk->decryptedData.begin() + offset,
chunk->decryptedData.begin() + offset + toRead);
this->pos += toRead;
} else {
chunk->decrypt();
}
}
else
{
res.insert(res.end(), chunk->decryptedData.begin() + offset, chunk->decryptedData.end());
this->pos += chunk->decryptedData.size() - offset;
toRead -= chunk->decryptedData.size() - offset;
}
}
else
{
CSPOT_LOG(debug, "Waiting for chunk to load");
while (chunk->isLoadedSemaphore->twait() != 0);
if (chunk->isFailed)
{
auto requestChunk = this->requestChunk(chunkIndex);
while (requestChunk->isLoadedSemaphore->twait() != 0);
goto READ;
}
}
}
else
{
CSPOT_LOG(debug, "Actual request %d", chunkIndex);
this->requestChunk(chunkIndex);
}
}
if (!loadingMeta)
{
auto requestedOffset = 0;
while (requestedOffset < BUFFER_SIZE)
{
auto chunk = findChunkForPosition(pos + requestedOffset);
if (chunk != nullptr)
{
requestedOffset = chunk->endPosition - pos;
// Don not buffer over EOL - unnecessary "failed chunks"
if ((pos + requestedOffset) >= fileSize)
{
break;
}
}
else
{
auto chunkReq = manager->fetchAudioChunk(fileId, audioKey, (pos + requestedOffset) / 4, (pos + requestedOffset + AUDIO_CHUNK_SIZE) / 4);
CSPOT_LOG(debug, "Chunk req end pos %d", chunkReq->endPosition);
this->chunks.push_back(chunkReq);
}
}
}
return res;
}
std::shared_ptr<AudioChunk> ChunkedAudioStream::findChunkForPosition(size_t position)
{
for (int i = 0; i < this->chunks.size(); i++)
{
auto chunk = this->chunks[i];
if (chunk->startPosition <= position && chunk->endPosition > position)
{
return chunk;
}
}
return nullptr;
}
//
//void ChunkedAudioStream::fetchTraillingPacket()
//{
// auto startPosition = (this->fileSize / 4) - 0x1000;
//
// // AES block size is 16, so the index must be divisible by it
// while ((startPosition * 4) % 16 != 0)
// startPosition++; // ik, ugly lol
//
// auto endChunk = manager->fetchAudioChunk(fileId, audioKey, startPosition, fileSize / 4);
// endChunk->keepInMemory = true;
//
// chunks.push_back(endChunk);
// while (endChunk->isLoadedSemaphore->twait() != 0);
//}
void ChunkedAudioStream::seek(size_t dpos, Whence whence)
{
BELL_LOG(info, "cspot", "%d", dpos);
auto seekPos = 0;
switch (whence)
{
case Whence::START:
this->pos = dpos;
seekPos = dpos;
break;
case Whence::CURRENT:
this->pos += dpos;
seekPos = byteStream->position() + dpos;
break;
case Whence::END:
this->pos = fileSize + dpos;
seekPos = byteStream->size() + dpos;
break;
}
auto currentChunk = this->pos / AUDIO_CHUNK_SIZE;
if (findChunkForPosition(this->pos) == nullptr)
{
// Seeking might look back - therefore we preload some past data
auto startPosition = (this->pos / 4) - (AUDIO_CHUNK_SIZE / 4);
// AES block size is 16, so the index must be divisible by it
while ((startPosition * 4) % 16 != 0)
startPosition++; // ik, ugly lol
this->chunks.push_back(manager->fetchAudioChunk(fileId, audioKey, startPosition, startPosition + (AUDIO_CHUNK_SIZE / 4)));
}
CSPOT_LOG(debug, "Change in current chunk %d", currentChunk);
}
std::shared_ptr<AudioChunk> ChunkedAudioStream::requestChunk(size_t chunkIndex)
{
CSPOT_LOG(debug, "Chunk Req %d", chunkIndex);
auto chunk = manager->fetchAudioChunk(fileId, audioKey, chunkIndex);
this->chunks.push_back(chunk);
return chunk;
}
byteStream->seek(seekPos);
}

View File

@@ -9,7 +9,7 @@ std::map<MercuryType, std::string> MercuryTypeMap({
{MercuryType::UNSUB, "UNSUB"},
});
MercuryManager::MercuryManager(std::unique_ptr<Session> session): bell::Task("mercuryManager", 6 * 1024, +1, 1)
MercuryManager::MercuryManager(std::unique_ptr<Session> session): bell::Task("mercuryManager", 6 * 1024, 2, 1)
{
tempMercuryHeader = Header_init_default;
this->timeProvider = std::make_shared<TimeProvider>();
@@ -30,7 +30,7 @@ MercuryManager::MercuryManager(std::unique_ptr<Session> session): bell::Task("me
MercuryManager::~MercuryManager()
{
pb_release(Header_fields, tempMercuryHeader);
//pb_release(Header_fields, &tempMercuryHeader);
}
bool MercuryManager::timeoutHandler()
@@ -177,7 +177,6 @@ void MercuryManager::runTask()
}
if (static_cast<MercuryType>(packet->command) == MercuryType::PING) // @TODO: Handle time synchronization through ping
{
CSPOT_LOG(debug, "Got ping, syncing timestamp");
this->timeProvider->syncWithPingPacket(packet->data);
this->lastPingTimestamp = this->timeProvider->getSyncedTimestamp();

View File

@@ -9,7 +9,7 @@ MercuryResponse::MercuryResponse(std::vector<uint8_t> &data)
}
MercuryResponse::~MercuryResponse() {
pb_release(Header_fields, mercuryHeader);
pb_release(Header_fields, &mercuryHeader);
}
void MercuryResponse::parseResponse(std::vector<uint8_t> &data)
@@ -34,6 +34,6 @@ void MercuryResponse::parseResponse(std::vector<uint8_t> &data)
pos += 2 + partSize;
}
pb_release(Header_fields, this->mercuryHeader);
pb_release(Header_fields, &this->mercuryHeader);
pbDecode(this->mercuryHeader, Header_fields, headerBytes);
}

View File

@@ -104,6 +104,7 @@ std::vector<uint8_t> PlainConnection::readBlock(size_t size)
std::vector<uint8_t> buf(size);
unsigned int idx = 0;
ssize_t n;
int retries = 0;
// printf("START READ\n");
while (idx < size)
@@ -124,7 +125,8 @@ std::vector<uint8_t> PlainConnection::readBlock(size_t size)
case EINTR:
break;
default:
throw std::runtime_error("Corn");
if (retries++ > 4) throw std::runtime_error("Error in read");
}
}
idx += n;
@@ -138,6 +140,7 @@ size_t PlainConnection::writeBlock(const std::vector<uint8_t> &data)
unsigned int idx = 0;
ssize_t n;
// printf("START WRITE\n");
int retries = 0;
while (idx < data.size())
{
@@ -156,7 +159,7 @@ size_t PlainConnection::writeBlock(const std::vector<uint8_t> &data)
case EINTR:
break;
default:
throw std::runtime_error("Corn");
if (retries++ > 4) throw std::runtime_error("Error in write");
}
}
idx += n;

View File

@@ -68,7 +68,7 @@ void Player::feedPCM(std::vector<uint8_t>& data)
}
}
this->audioSink->feedPCMFrames(data);
this->audioSink->feedPCMFrames(data.data(), data.size());
}
void Player::runTask()

View File

@@ -53,8 +53,9 @@ PlayerState::PlayerState(std::shared_ptr<TimeProvider> timeProvider)
}
PlayerState::~PlayerState() {
pb_release(Frame_fields, innerFrame);
pb_release(Frame_fields, remoteFrame);
pb_release(Frame_fields, &remoteFrame);
// do not destruct inner frame as it is never allocated
// pb_release(Frame_fields, &innerFrame);
}
void PlayerState::setPlaybackState(const PlaybackState state)
@@ -136,7 +137,7 @@ void PlayerState::updatePositionMs(uint32_t position)
void PlayerState::updateTracks()
{
CSPOT_LOG(info, "---- Track count %d", remoteFrame.state.track_count);
innerFrame.state.context_uri = remoteFrame.state.context_uri == nullptr ? nullptr : strdup(remoteFrame.state.context_uri);
//innerFrame.state.context_uri = remoteFrame.state.context_uri == nullptr ? nullptr : strdup(remoteFrame.state.context_uri);
std::copy(std::begin(remoteFrame.state.track), std::end(remoteFrame.state.track), std::begin(innerFrame.state.track));
innerFrame.state.track_count = remoteFrame.state.track_count;
innerFrame.state.has_playing_track_index = true;

View File

@@ -6,10 +6,10 @@ using random_bytes_engine = std::independent_bits_engine<std::default_random_eng
Session::Session()
{
this->clientHello = ClientHello_init_default;
this->apResponse = APResponseMessage_init_default;
this->authRequest = ClientResponseEncrypted_init_default;
this->clientResPlaintext = ClientResponsePlaintext_init_default;
this->clientHello = {};
this->apResponse = {};
this->authRequest = {};
this->clientResPlaintext = {};
// Generates the public and priv key
this->crypto = std::make_unique<Crypto>();
@@ -18,10 +18,9 @@ Session::Session()
Session::~Session()
{
pb_release(ClientHello_fields, clientHello);
pb_release(APResponseMessage_fields, apResponse);
pb_release(ClientResponseEncrypted_fields, authRequest);
pb_release(ClientResponsePlaintext_fields, clientResPlaintext);
pb_release(ClientHello_fields, &clientHello);
pb_release(APResponseMessage_fields, &apResponse);
pb_release(ClientResponsePlaintext_fields, &clientResPlaintext);
}
void Session::connect(std::unique_ptr<PlainConnection> connection)
@@ -60,6 +59,7 @@ std::vector<uint8_t> Session::authenticate(std::shared_ptr<LoginBlob> blob)
authRequest.version_string = (char *)versionString;
auto data = pbEncode(ClientResponseEncrypted_fields, &authRequest);
free(authRequest.login_credentials.auth_data);
// Send login request
this->shanConn->sendPacket(LOGIN_REQUEST_COMMAND, data);
@@ -72,7 +72,7 @@ std::vector<uint8_t> Session::authenticate(std::shared_ptr<LoginBlob> blob)
CSPOT_LOG(debug, "Authorization successful");
// @TODO store the reusable credentials
// PBWrapper<APWelcome> welcomePacket(packet->data)
// PBWrapper<APWelcome> welcomePacket(packet->data)
return std::vector<uint8_t>({0x1}); // TODO: return actual reusable credentaials to be stored somewhere
break;
}
@@ -96,7 +96,7 @@ void Session::processAPHelloResponse(std::vector<uint8_t> &helloPacket)
// Decode the response
auto skipSize = std::vector<uint8_t>(data.begin() + 4, data.end());
pb_release(APResponseMessage_fields, apResponse);
pb_release(APResponseMessage_fields, &apResponse);
pbDecode(apResponse, APResponseMessage_fields, skipSize);
auto diffieKey = std::vector<uint8_t>(apResponse.challenge.login_crypto_challenge.diffie_hellman.gs, apResponse.challenge.login_crypto_challenge.diffie_hellman.gs + 96);

View File

@@ -103,7 +103,7 @@ void SpircController::prevSong() {
}
void SpircController::handleFrame(std::vector<uint8_t> &data) {
pb_release(Frame_fields, state->remoteFrame);
//pb_release(Frame_fields, &state->remoteFrame);
pbDecode(state->remoteFrame, Frame_fields, data);
switch (state->remoteFrame.typ) {

View File

@@ -35,8 +35,8 @@ SpotifyTrack::~SpotifyTrack()
{
this->manager->unregisterMercuryCallback(this->reqSeqNum);
this->manager->freeAudioKeyCallback();
pb_release(Track_fields, this->trackInfo);
pb_release(Episode_fields, this->episodeInfo);
pb_release(Track_fields, &this->trackInfo);
pb_release(Episode_fields, &this->episodeInfo);
}
bool SpotifyTrack::countryListContains(std::string countryList, std::string country)
@@ -75,7 +75,7 @@ void SpotifyTrack::trackInformationCallback(std::unique_ptr<MercuryResponse> res
return;
CSPOT_ASSERT(response->parts.size() > 0, "response->parts.size() must be greater than 0");
pb_release(Track_fields, trackInfo);
pb_release(Track_fields, &trackInfo);
pbDecode(trackInfo, Track_fields, response->parts[0]);
CSPOT_LOG(info, "Track name: %s", trackInfo.name);
@@ -127,7 +127,7 @@ void SpotifyTrack::episodeInformationCallback(std::unique_ptr<MercuryResponse> r
return;
CSPOT_LOG(debug, "Got to episode");
CSPOT_ASSERT(response->parts.size() > 0, "response->parts.size() must be greater than 0");
pb_release(Episode_fields, episodeInfo);
pb_release(Episode_fields, &episodeInfo);
pbDecode(episodeInfo, Episode_fields, response->parts[0]);
CSPOT_LOG(info, "--- Episode name: %s", episodeInfo.name);

View File

@@ -19,8 +19,9 @@ TrackReference::TrackReference(TrackRef *ref)
TrackReference::~TrackReference()
{
pb_release(TrackRef_fields, ref);
}
//pb_release(TrackRef_fields, &ref);
//pbFree(TrackRef_fields, &ref);
}
std::vector<uint8_t> TrackReference::base62Decode(std::string uri)
{