From 79d1e0e5389bb314cf3a7f7265055114ae42c8ce Mon Sep 17 00:00:00 2001 From: Achmad Date: Sun, 17 May 2026 08:10:03 +0000 Subject: [PATCH] Add date/time range support to get_historical_candles - _fetch_candles now accepts explicit from_ts/to_ts timestamps; falls back to count-based window when omitted - get_historical_candles accepts from_date/to_date in YYYY-MM-DD, YYYY-MM-DD HH:MM, or YYYY-MM-DD HH:MM:SS formats (all UTC) - Yahoo Finance fallback switched from range= to period1/period2 so it respects the exact window in both modes - Added 11 tests covering date range, datetime, end-of-day bump, invalid format, count cap, and pattern detection toggle - gitignore uv.lock (Dockerfile uses pip, not uv sync) --- .gitignore | 3 +- server.py | 63 +++++++++++++++++++++--------- tests/test_server.py | 91 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 4db4f85..65a659a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ .pytest_cache/ *.pyc .vscode/ -opencode.json \ No newline at end of file +opencode.json +uv.lock \ No newline at end of file diff --git a/server.py b/server.py index 1fd9b89..0bd8a71 100644 --- a/server.py +++ b/server.py @@ -311,13 +311,17 @@ def _fetch_candles( ticker: str, resolution: str = "D", count: int = 30, + from_ts: int | None = None, + to_ts: int | None = None, ) -> list[dict[str, Any]]: # Try TradingView chart API first sym = _resolve_symbol(ticker, "stocks") - to_ts = int(time.time()) - resolution_seconds = {"1": 60, "5": 300, "15": 900, "30": 1800, "60": 3600, - "240": 14400, "D": 86400, "W": 604800, "M": 2592000} - from_ts = to_ts - (count * resolution_seconds.get(resolution, 86400)) + if to_ts is None: + to_ts = int(time.time()) + if from_ts is None: + resolution_seconds = {"1": 60, "5": 300, "15": 900, "30": 1800, "60": 3600, + "240": 14400, "D": 86400, "W": 604800, "M": 2592000} + from_ts = to_ts - (count * resolution_seconds.get(resolution, 86400)) tv_urls = [ "https://chart-data.tradingview.com/history", @@ -349,24 +353,14 @@ def _fetch_candles( interval_map = {"1": "1m", "5": "5m", "15": "15m", "30": "30m", "60": "60m", "240": "4h", "D": "1d", "W": "1wk", "M": "1mo"} interval = interval_map.get(resolution, "1d") - range_map = {30: "1mo", 60: "3mo", 120: "6mo", 250: "1y", 500: "2y"} - y_range = "1mo" - for cnt, r in sorted(range_map.items()): - if count <= cnt: - y_range = r - break - else: - y_range = "5y" url = f"https://query1.finance.yahoo.com/v8/finance/chart/{yahoo_sym}" - params = {"interval": interval, "range": y_range} + params: dict[str, Any] = {"interval": interval, "period1": from_ts, "period2": to_ts} headers = {"User-Agent": TV_CHART_HEADERS["User-Agent"]} resp = requests.get(url, headers=headers, params=params, timeout=15) data = resp.json() result = data.get("chart", {}).get("result", [None])[0] - if not result: - result = data.get("chart", {}).get("result", [None])[0] if result: timestamps = result.get("timestamp", []) quotes = result.get("indicators", {}).get("quote", [None])[0] @@ -385,7 +379,7 @@ def _fetch_candles( "time": timestamps[i], "open": o, "high": h, "low": l, "close": c, "volume": v, }) - return candles[-count:] + return candles except requests.RequestException: pass @@ -494,6 +488,8 @@ def get_historical_candles( count: int = 30, market_type: str = "stocks", detect_patterns: bool = True, + from_date: str | None = None, + to_date: str | None = None, ) -> str: """Fetch historical OHLCV candlestick data for candle pattern and trend analysis. @@ -501,12 +497,40 @@ def get_historical_candles( ticker: Full ticker (e.g. "IDX:BBRI", "NASDAQ:AAPL", "IDX:BUMI"). resolution: Candle resolution — 1, 3, 5, 15, 30, 60, 240 (minutes), D (daily, default), W (weekly), M (monthly). - count: Number of candles to fetch (max 500, default 30). + count: Number of candles to fetch when no date range given (max 500, default 30). market_type: Market type (stocks, crypto, forex, etc.). detect_patterns: Whether to detect candlestick patterns (default True). + from_date: Start date/datetime (e.g. "2026-05-08" or "2026-05-17 14:00"). When + provided, fetches candles from this point instead of counting back from now. + to_date: End date/datetime (e.g. "2026-05-13" or "2026-05-17 14:05"). Defaults to + now when from_date is given but to_date is omitted. """ + import calendar + + def _parse_dt(s: str, end_of_period: bool = False) -> int: + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): + try: + t = time.strptime(s, fmt) + ts = int(calendar.timegm(t)) + # For date-only strings, end_of_period bumps to 23:59:59 + if end_of_period and fmt == "%Y-%m-%d": + ts += 86399 + return ts + except ValueError: + continue + raise ValueError(f"Unrecognised date/datetime format: {s!r}. Use YYYY-MM-DD or YYYY-MM-DD HH:MM") + + from_ts: int | None = None + to_ts: int | None = None + if from_date: + from_ts = _parse_dt(from_date) + if to_date: + to_ts = _parse_dt(to_date, end_of_period=True) + elif from_date: + to_ts = int(time.time()) + count = min(count, 500) - candles = _fetch_candles(ticker, resolution, count) + candles = _fetch_candles(ticker, resolution, count, from_ts=from_ts, to_ts=to_ts) if not candles: return f"No historical data returned for {ticker}" @@ -518,7 +542,8 @@ def get_historical_candles( avg_vol = sum(c["volume"] for c in candles) / len(candles) lines: list[str] = [] - lines.append(f"📊 {ticker} — {count}candle chart ({resolution} resolution)") + candle_label = f"{from_date} → {to_date or 'now'}" if from_date else f"{len(candles)} candles" + lines.append(f"📊 {ticker} — {candle_label} ({resolution} resolution)") lines.append(f" Period: {time.strftime('%Y-%m-%d', time.gmtime(first['time']))}" f" → {time.strftime('%Y-%m-%d', time.gmtime(last['time']))}") lines.append(f" Latest: O={last['open']:.2f} H={last['high']:.2f} L={last['low']:.2f} C={last['close']:.2f}") diff --git a/tests/test_server.py b/tests/test_server.py index d83ffc5..79abdd2 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,6 +1,8 @@ from __future__ import annotations +import calendar import os +import time from unittest.mock import patch import pytest @@ -15,6 +17,7 @@ from server import ( find_top_gainers, find_top_losers, fundamental_scan, + get_historical_candles, get_stock_quotes, list_fields, list_markets, @@ -468,3 +471,91 @@ class TestSessionTools: server._session_cookies = None result = clear_session() assert "cleared" in result + + +# --------------------------------------------------------------------------- +# get_historical_candles +# --------------------------------------------------------------------------- + +_MOCK_CANDLES = [ + {"time": 1746662400, "open": 100.0, "high": 110.0, "low": 95.0, "close": 105.0, "volume": 1_000_000}, + {"time": 1746748800, "open": 105.0, "high": 115.0, "low": 100.0, "close": 108.0, "volume": 1_200_000}, + {"time": 1746835200, "open": 108.0, "high": 112.0, "low": 104.0, "close": 106.0, "volume": 900_000}, +] + + +class TestGetHistoricalCandles: + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_basic_count_based(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL", resolution="D", count=3) + mock_fetch.assert_called_once_with("NASDAQ:AAPL", "D", 3, from_ts=None, to_ts=None) + assert "NASDAQ:AAPL" in result + assert "3 candles" in result + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_from_date_only(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL", from_date="2026-05-08") + kw = mock_fetch.call_args.kwargs + expected_from = int(calendar.timegm(time.strptime("2026-05-08", "%Y-%m-%d"))) + assert kw["from_ts"] == expected_from + assert kw["to_ts"] is not None # defaults to now + assert "2026-05-08" in result + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_from_and_to_date(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL", from_date="2026-05-08", to_date="2026-05-13") + kw = mock_fetch.call_args.kwargs + expected_from = int(calendar.timegm(time.strptime("2026-05-08", "%Y-%m-%d"))) + expected_to = int(calendar.timegm(time.strptime("2026-05-13", "%Y-%m-%d"))) + 86399 + assert kw["from_ts"] == expected_from + assert kw["to_ts"] == expected_to + assert "2026-05-08 → 2026-05-13" in result + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_datetime_strings(self, mock_fetch): + get_historical_candles("NASDAQ:AAPL", from_date="2026-05-17 14:00", to_date="2026-05-17 14:05") + kw = mock_fetch.call_args.kwargs + expected_from = int(calendar.timegm(time.strptime("2026-05-17 14:00", "%Y-%m-%d %H:%M"))) + expected_to = int(calendar.timegm(time.strptime("2026-05-17 14:05", "%Y-%m-%d %H:%M"))) + assert kw["from_ts"] == expected_from + assert kw["to_ts"] == expected_to + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_datetime_with_seconds(self, mock_fetch): + get_historical_candles("NASDAQ:AAPL", from_date="2026-05-17 14:00:30", to_date="2026-05-17 14:05:00") + kw = mock_fetch.call_args.kwargs + expected_from = int(calendar.timegm(time.strptime("2026-05-17 14:00:30", "%Y-%m-%d %H:%M:%S"))) + assert kw["from_ts"] == expected_from + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_date_only_to_date_end_of_day(self, mock_fetch): + # date-only to_date should be bumped to end of that day (23:59:59) + get_historical_candles("NASDAQ:AAPL", from_date="2026-05-08", to_date="2026-05-13") + kw = mock_fetch.call_args.kwargs + midnight = int(calendar.timegm(time.strptime("2026-05-13", "%Y-%m-%d"))) + assert kw["to_ts"] == midnight + 86399 + + @patch("server._fetch_candles", return_value=[]) + def test_no_data_returned(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL") + assert "No historical data" in result + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_invalid_date_format_raises(self, mock_fetch): + with pytest.raises(ValueError, match="Unrecognised date"): + get_historical_candles("NASDAQ:AAPL", from_date="05/08/2026") + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_count_capped_at_500(self, mock_fetch): + get_historical_candles("NASDAQ:AAPL", count=9999) + assert mock_fetch.call_args.args[2] == 500 + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_detect_patterns_included_by_default(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL") + assert "Pattern Detection" in result + + @patch("server._fetch_candles", return_value=_MOCK_CANDLES) + def test_detect_patterns_disabled(self, mock_fetch): + result = get_historical_candles("NASDAQ:AAPL", detect_patterns=False) + assert "Pattern Detection" not in result