Skip to content

Commit

Permalink
rollback to older prepare_hlcvs function
Browse files Browse the repository at this point in the history
  • Loading branch information
enarjord committed Oct 19, 2024
1 parent b1e108a commit 982d871
Showing 1 changed file with 65 additions and 93 deletions.
158 changes: 65 additions & 93 deletions src/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,124 +474,96 @@ async def prepare_hlcvs(config: dict):
base_dir = config["backtest"]["base_dir"]
exchange = config["backtest"]["exchange"]
minimum_coin_age_days = config["live"]["minimum_coin_age_days"]

ms2day = 1000 * 60 * 60 * 24
if end_date in ["today", "now", "", None]:
end_ts = (utc_ms() - ms2day) // ms2day * ms2day
end_date = ts_to_date_utc(end_ts)[:10]
else:
end_ts = date_to_ts2(end_date) // ms2day * ms2day

hlcvsd = {}
interval_ms = 60000
start_tss = None
if exchange == "binance":
start_tss = await get_first_ohlcv_timestamps(cc=ccxt.binanceusdm(), symbols=symbols)
elif exchange == "bybit":
start_tss = await get_first_ohlcv_timestamps(cc=ccxt.bybit(), symbols=symbols)
else:
raise Exception("failed to load start timestamps")

# Calculate global start and end times
global_start_ts = date_to_ts2(start_date)
global_end_ts = end_ts
n_timesteps = int((global_end_ts - global_start_ts) / interval_ms) + 1

# Pre-allocate the unified array
n_coins = len(symbols)
unified_array = np.zeros((n_timesteps, n_coins, 4), dtype=np.float64)

# Create a partial function with fixed arguments
process_symbol_partial = partial(
process_symbol,
start_date=start_date,
end_date=end_date,
base_dir=base_dir,
exchange=exchange,
minimum_coin_age_days=minimum_coin_age_days,
global_start_ts=global_start_ts,
global_end_ts=global_end_ts,
interval_ms=interval_ms,
start_tss=start_tss,
)

# Use asyncio.gather for concurrent processing
tasks = [process_symbol_partial(symbol) for symbol in symbols]
results = await asyncio.gather(*tasks)

# Process results and update unified_array
for i, (symbol, data) in enumerate(zip(symbols, results)):
if data is not None:
start_idx, end_idx, coin_data = data
unified_array[start_idx:end_idx, i, :] = coin_data
for symbol in symbols:
adjusted_start_ts = date_to_ts2(start_date)
if minimum_coin_age_days > 0.0:
min_coin_age_ms = 1000 * 60 * 60 * 24 * minimum_coin_age_days
if symbol not in start_tss:
print(f"coin {symbol} missing from first timestamps, skipping")
continue
new_start_ts = start_tss[symbol] + min_coin_age_ms
if new_start_ts >= end_ts:
print(
f"Coin {symbol} too young, start date {ts_to_date_utc(start_tss[symbol])}, skipping"
)
continue
if new_start_ts > adjusted_start_ts:
print(
f"First date for {symbol} was {ts_to_date_utc(start_tss[symbol])}. Adjusting start date to {ts_to_date_utc(new_start_ts)}"
)
adjusted_start_ts = new_start_ts
data = await load_hlcvs(
symbol,
ts_to_date_utc(adjusted_start_ts)[:10],
end_date,
base_dir,
exchange,
)
if len(data) == 0:
continue
assert (
np.diff(data[:, 0]) == interval_ms
).all(), f"gaps in hlcv data {symbol}" # verify integrous 1m hlcvs
hlcvsd[symbol] = data
symbols = sorted(hlcvsd.keys())
if len(symbols) > 1:
print(f"Unifying data for {len(symbols)} coins into single numpy array...")
timestamps, unified_data = unify_hlcv_data([hlcvsd[s] for s in symbols])
return symbols, timestamps, unified_data

# Front-fill
if start_idx > 0:
unified_array[:start_idx, i, :3] = coin_data[0, 2]

# Back-fill
if end_idx < n_timesteps:
unified_array[end_idx:, i, :3] = coin_data[-1, 2]
def unify_hlcv_data(hlcv_list) -> (np.ndarray, np.ndarray):

print(f"Finished fetching all data. Returning unified array.")
# Find the global start and end timestamps
start_time = min(arr[0, 0] for arr in hlcv_list)
end_time = max(arr[-1, 0] for arr in hlcv_list)

timestamps = np.arange(global_start_ts, global_end_ts + interval_ms, interval_ms)
# Calculate the number of timesteps
n_timesteps = int((end_time - start_time) / 60000) + 1

return symbols, timestamps, unified_array
# Create the unified array
n_coins = len(hlcv_list)
unified_array = np.zeros((n_timesteps, n_coins, 4))

# Create the timestamp array
timestamps = np.arange(start_time, end_time + 60000, 60000)

async def process_symbol(
symbol,
start_date,
end_date,
base_dir,
exchange,
minimum_coin_age_days,
global_start_ts,
global_end_ts,
interval_ms,
start_tss,
):
if symbol not in start_tss:
print(f"coin {symbol} missing from first timestamps, skipping")
return None
for i, ohlcv in enumerate(hlcv_list):
# Calculate the start and end indices for this coin
start_idx = int((ohlcv[0, 0] - start_time) / 60000)
end_idx = start_idx + len(ohlcv)

adjusted_start_ts = global_start_ts
if minimum_coin_age_days > 0.0:
min_coin_age_ms = 1000 * 60 * 60 * 24 * minimum_coin_age_days
new_start_ts = start_tss[symbol] + min_coin_age_ms
if new_start_ts >= global_end_ts:
print(
f"Coin {symbol} too young, start date {ts_to_date_utc(start_tss[symbol])}, skipping"
)
return None
if new_start_ts > adjusted_start_ts:
print(
f"First date for {symbol} was {ts_to_date_utc(start_tss[symbol])}. Adjusting start date to {ts_to_date_utc(new_start_ts)}"
)
adjusted_start_ts = max(adjusted_start_ts, new_start_ts)

data = await load_hlcvs(
symbol,
ts_to_date_utc(adjusted_start_ts)[:10],
end_date,
base_dir,
exchange,
)
# Extract the required data (high, low, close, volume)
coin_data = ohlcv[:, 1:]

if len(data) == 0:
return None
# Use quote volume as volume
coin_data[:, 3] = coin_data[:, 2] * coin_data[:, 3]

assert (np.diff(data[:, 0]) == interval_ms).all(), f"gaps in hlcv data {symbol}"
# Place the data in the unified array
unified_array[start_idx:end_idx, i, :] = coin_data

# Calculate indices for this coin's data
start_idx = int((data[0, 0] - global_start_ts) / interval_ms)
end_idx = start_idx + len(data)
# Front-fill
if start_idx > 0:
unified_array[:start_idx, i, :3] = coin_data[0, 2] # Set high, low, close to first close

# Extract and process the required data (high, low, close, volume)
coin_data = data[:, 1:]
coin_data[:, 3] = coin_data[:, 2] * coin_data[:, 3] # Use quote volume as volume
# Back-fill
if end_idx < n_timesteps:
unified_array[end_idx:, i, :3] = coin_data[-1, 2] # Set high, low, close to last close

return start_idx, end_idx, coin_data
return timestamps, unified_array


def convert_csv_to_npy(filepath):
Expand Down

0 comments on commit 982d871

Please sign in to comment.