1+ # data_collector.py
2+ import asyncio
3+ import json
4+ import websockets
5+ import aiohttp
6+ import time
7+ from datetime import datetime
8+ from database import CryptoDatabase
9+
10+ class DataCollector :
11+ def __init__ (self , config_path = "config/api_config.json" ):
12+ """Inicializador del recolector de datos de criptomonedas"""
13+ self .db = CryptoDatabase ()
14+ self .active_streams = {}
15+ self .api_keys = {}
16+ self .exchanges = []
17+ self .load_config (config_path )
18+
19+ def load_config (self , config_path ):
20+ """Carga la configuración de APIs desde un archivo JSON"""
21+ with open (config_path , 'r' ) as file :
22+ config = json .load (file )
23+ self .api_keys = config .get ('api_keys' , {})
24+ self .exchanges = config .get ('exchanges' , [])
25+
26+ async def initialize_streams (self ):
27+ """Inicializa los streams de datos para todos los exchanges configurados"""
28+ tasks = []
29+ for exchange in self .exchanges :
30+ if exchange == 'binance' :
31+ tasks .append (self .connect_binance ())
32+ elif exchange == 'coinbase' :
33+ tasks .append (self .connect_coinbase ())
34+ # Agregar más exchanges según sea necesario
35+
36+ await asyncio .gather (* tasks )
37+
38+ async def connect_binance (self ):
39+ """Establece conexión con WebSocket de Binance para datos en tiempo real"""
40+ uri = "wss://stream.binance.com:9443/ws/!ticker@arr"
41+
42+ async with websockets .connect (uri ) as websocket :
43+ self .active_streams ['binance' ] = websocket
44+ while True :
45+ try :
46+ response = await websocket .recv ()
47+ data = json .loads (response )
48+
49+ # Procesamiento de los datos recibidos
50+ for ticker in data :
51+ symbol = ticker ['s' ]
52+ if symbol .endswith ('USDT' ): # Filtramos pares con USDT
53+ crypto = symbol [:- 4 ] # Eliminamos 'USDT' del final
54+ price = float (ticker ['c' ]) # Precio actual
55+ timestamp = int (time .time () * 1000 )
56+
57+ # Almacenamos en la base de datos
58+ await self .db .store_price_data ({
59+ 'exchange' : 'binance' ,
60+ 'symbol' : crypto ,
61+ 'price' : price ,
62+ 'timestamp' : timestamp ,
63+ 'volume_24h' : float (ticker ['v' ]),
64+ 'change_24h' : float (ticker ['p' ])
65+ })
66+
67+ except Exception as e :
68+ print (f"Error en Binance WebSocket: { e } " )
69+ # Reconexión tras error
70+ await asyncio .sleep (5 )
71+ return await self .connect_binance ()
72+
73+ async def connect_coinbase (self ):
74+ """Establece conexión con la API de Coinbase para datos en tiempo real"""
75+ uri = "wss://ws-feed.pro.coinbase.com"
76+
77+ # Configuración de la suscripción
78+ subscription = {
79+ "type" : "subscribe" ,
80+ "channels" : [{"name" : "ticker" , "product_ids" : ["BTC-USD" , "ETH-USD" , "SOL-USD" ]}]
81+ }
82+
83+ async with websockets .connect (uri ) as websocket :
84+ self .active_streams ['coinbase' ] = websocket
85+
86+ # Enviar mensaje de suscripción
87+ await websocket .send (json .dumps (subscription ))
88+
89+ while True :
90+ try :
91+ response = await websocket .recv ()
92+ data = json .loads (response )
93+
94+ if data .get ('type' ) == 'ticker' :
95+ product_id = data .get ('product_id' , '' )
96+ if '-USD' in product_id :
97+ crypto = product_id .split ('-' )[0 ]
98+ price = float (data .get ('price' , 0 ))
99+ timestamp = int (datetime .fromisoformat (data .get ('time' ).replace ('Z' , '+00:00' )).timestamp () * 1000 )
100+
101+ # Almacenamos en la base de datos
102+ await self .db .store_price_data ({
103+ 'exchange' : 'coinbase' ,
104+ 'symbol' : crypto ,
105+ 'price' : price ,
106+ 'timestamp' : timestamp ,
107+ 'volume_24h' : float (data .get ('volume_24h' , 0 )),
108+ 'change_24h' : 0 # Coinbase no proporciona este dato directamente
109+ })
110+
111+ except Exception as e :
112+ print (f"Error en Coinbase WebSocket: { e } " )
113+ # Reconexión tras error
114+ await asyncio .sleep (5 )
115+ return await self .connect_coinbase ()
116+
117+ async def fetch_rest_data (self ):
118+ """Obtiene datos adicionales a través de APIs REST para exchanges que no soportan WebSockets"""
119+ while True :
120+ for exchange in self .exchanges :
121+ if exchange == 'kraken' :
122+ await self .fetch_kraken_data ()
123+ # Agregar más exchanges según sea necesario
124+
125+ # Actualizamos cada 30 segundos para APIs REST
126+ await asyncio .sleep (30 )
127+
128+ async def fetch_kraken_data (self ):
129+ """Obtiene datos de precios desde la API REST de Kraken"""
130+ url = "https://api.kraken.com/0/public/Ticker"
131+ params = {"pair" : "BTCUSD,ETHUSD,SOLUSD" }
132+
133+ async with aiohttp .ClientSession () as session :
134+ try :
135+ async with session .get (url , params = params ) as response :
136+ if response .status == 200 :
137+ data = await response .json ()
138+ result = data .get ('result' , {})
139+
140+ # Mapeamos los pares de Kraken a símbolos estándar
141+ mapping = {
142+ 'XXBTZUSD' : 'BTC' ,
143+ 'XETHZUSD' : 'ETH' ,
144+ 'SOLUSD' : 'SOL'
145+ }
146+
147+ for pair , info in result .items ():
148+ if pair in mapping :
149+ crypto = mapping [pair ]
150+ price = float (info ['c' ][0 ]) # Precio actual
151+ timestamp = int (time .time () * 1000 )
152+
153+ # Almacenamos en la base de datos
154+ await self .db .store_price_data ({
155+ 'exchange' : 'kraken' ,
156+ 'symbol' : crypto ,
157+ 'price' : price ,
158+ 'timestamp' : timestamp ,
159+ 'volume_24h' : float (info ['v' ][1 ]), # Volumen 24h
160+ 'change_24h' : 0 # Calcularlo manualmente si es necesario
161+ })
162+ except Exception as e :
163+ print (f"Error obteniendo datos de Kraken: { e } " )
164+
165+ async def run (self ):
166+ """Ejecuta todos los recolectores de datos en paralelo"""
167+ await asyncio .gather (
168+ self .initialize_streams (),
169+ self .fetch_rest_data ()
170+ )
171+
172+ if __name__ == "__main__" :
173+ collector = DataCollector ()
174+ asyncio .run (collector .run ())
0 commit comments