PYTHON_JOB_INTERVIEW Telegram 1150
📌 Сложная задача для собеседования на Python: «Асинхронный кэш с TTL и инвалидацией».

Условие:
Реализуйте потокобезопасный асинхронный кэш с временем жизни записей, поддержкой инвалидации по ключу и автоматическим удалением устаревших записей. Кэш должен:
1️⃣ Хранить значения не дольше указанного TTL (seconds)
2️⃣ Автоматически очищать устаревшие записи без блокировки основного потока
3️⃣ Поддерживать асинхронные операции get/set
4️⃣ Иметь механизм ручной инвалидации
5️⃣ Гарантировать потокобезопасность
6️⃣ Оптимизировать память

Ожидаемое решение:
import asyncio
import time
from collections import OrderedDict
from typing import Any, Optional
import threading

class AsyncTTLCache:
def __init__(self, maxsize: int = 1024, ttl: int = 60):
self._cache = OrderedDict()
self._maxsize = maxsize
self._ttl = ttl
self._lock = threading.Lock()
self._cleanup_task = asyncio.create_task(self._cleanup_expired())

async def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
return None
value, expiry = self._cache[key]
if time.time() > expiry:
del self._cache[key]
return None
# Move to end to mark as recently used
self._cache.move_to_end(key)
return value

async def set(self, key: str, value: Any) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, time.time() + self._ttl)
if len(self._cache) > self._maxsize:
self._cache.popitem(last=False)

async def invalidate(self, key: str) -> None:
with self._lock:
if key in self._cache:
del self._cache[key]

async def _cleanup_expired(self) -> None:
while True:
await asyncio.sleep(self._ttl)
with self._lock:
now = time.time()
expired_keys = [
k for k, (_, expiry) in self._cache.items()
if expiry <= now
]
for k in expired_keys:
del self._cache[k]

def __del__(self):
self._cleanup_task.cancel()


# Пример использования
async def main():
cache = AsyncTTLCache(ttl=2)

await cache.set("a", 1)
print(await cache.get("a")) # 1

await asyncio.sleep(3)
print(await cache.get("a")) # None

await cache.set("b", 2)
await cache.invalidate("b")
print(await cache.get("b")) # None

asyncio.run(main())

Предлагайте свои варианты решения в комментариях⏬️

@python_job_interview
🔥176👍5😱3



tgoop.com/python_job_interview/1150
Create:
Last Update:

📌 Сложная задача для собеседования на Python: «Асинхронный кэш с TTL и инвалидацией».

Условие:
Реализуйте потокобезопасный асинхронный кэш с временем жизни записей, поддержкой инвалидации по ключу и автоматическим удалением устаревших записей. Кэш должен:
1️⃣ Хранить значения не дольше указанного TTL (seconds)
2️⃣ Автоматически очищать устаревшие записи без блокировки основного потока
3️⃣ Поддерживать асинхронные операции get/set
4️⃣ Иметь механизм ручной инвалидации
5️⃣ Гарантировать потокобезопасность
6️⃣ Оптимизировать память

Ожидаемое решение:

import asyncio
import time
from collections import OrderedDict
from typing import Any, Optional
import threading

class AsyncTTLCache:
def __init__(self, maxsize: int = 1024, ttl: int = 60):
self._cache = OrderedDict()
self._maxsize = maxsize
self._ttl = ttl
self._lock = threading.Lock()
self._cleanup_task = asyncio.create_task(self._cleanup_expired())

async def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
return None
value, expiry = self._cache[key]
if time.time() > expiry:
del self._cache[key]
return None
# Move to end to mark as recently used
self._cache.move_to_end(key)
return value

async def set(self, key: str, value: Any) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, time.time() + self._ttl)
if len(self._cache) > self._maxsize:
self._cache.popitem(last=False)

async def invalidate(self, key: str) -> None:
with self._lock:
if key in self._cache:
del self._cache[key]

async def _cleanup_expired(self) -> None:
while True:
await asyncio.sleep(self._ttl)
with self._lock:
now = time.time()
expired_keys = [
k for k, (_, expiry) in self._cache.items()
if expiry <= now
]
for k in expired_keys:
del self._cache[k]

def __del__(self):
self._cleanup_task.cancel()


# Пример использования
async def main():
cache = AsyncTTLCache(ttl=2)

await cache.set("a", 1)
print(await cache.get("a")) # 1

await asyncio.sleep(3)
print(await cache.get("a")) # None

await cache.set("b", 2)
await cache.invalidate("b")
print(await cache.get("b")) # None

asyncio.run(main())

Предлагайте свои варианты решения в комментариях⏬️

@python_job_interview

BY Python вопросы с собеседований


Share with your friend now:
tgoop.com/python_job_interview/1150

View MORE
Open in Telegram


Telegram News

Date: |

6How to manage your Telegram channel? For crypto enthusiasts, there was the “gm” app, a self-described “meme app” which only allowed users to greet each other with “gm,” or “good morning,” a common acronym thrown around on Crypto Twitter and Discord. But the gm app was shut down back in September after a hacker reportedly gained access to user data. There have been several contributions to the group with members posting voice notes of screaming, yelling, groaning, and wailing in different rhythms and pitches. Calling out the “degenerate” community or the crypto obsessives that engage in high-risk trading, Co-founder of NFT renting protocol Rentable World emiliano.eth shared this group on his Twitter. He wrote: “hey degen, are you stressed? Just let it out all out. Voice only tg channel for screaming”. When choosing the right name for your Telegram channel, use the language of your target audience. The name must sum up the essence of your channel in 1-3 words. If you’re planning to expand your Telegram audience, it makes sense to incorporate keywords into your name.
from us


Telegram Python вопросы с собеседований
FROM American