生產者緩存
⑴ ActiveMQ是什麼是干什麼用的
1、ActiviMq消息隊列,解決了服務解耦合的動作,緩解了服務並發量很大,造成伺服器無法處理的狀況。(kafka、rabbitMQ、activiMQ)
其他作用:非同步處理、消息通訊、流量消峰、應用解耦
應用場景:
1、用戶注冊的時候,重點內容是將用戶信息保存到資料庫中,發簡訊驗證或者是發郵件增加了業務的復雜度。這時使用MQ將發簡訊、發郵件通知MQ由另外的服務平台完成。
2、搜索平台、緩存平台
查詢數據,建立緩存、索引,不從資料庫查詢,從緩存或者索引庫查詢,當資料庫發生增加、修改、刪除操作時發消息給MQ,緩存平台或者是索引平台從MQ獲取到這個消息,更新緩存或者索引。
ActiveMQ使用的是標準的生產者(完成生產消息並發送消息)和消費者(獲取消息,完成自己的業務邏輯)模型
有兩種數據結構
Topic(發布訂閱) 一個生產者對應多個消費者,消息默認不會持久化,需要手動配置持久化。如果A伺服器掛了,再生產一條消息的話,會被B伺服器拿去使用,就算重新啟動,A伺服器也不會再拿到消息了
商品系統、庫存系統、生成商品詳情頁面的系統,現在要添加一個商品信息,消息肯定是需要讓庫存系統以及商品信息詳情頁面系統知道的。
Queue(點對點)一個生產者對應一個消費者,默認消息持久化
StringMessage
mapMessage
byteMessage
objectMessage
要完成topic模式的消息持久化,需要保證每個消費者有唯一的clientID(本文來自北大青鳥)
⑵ 如何在C/C++程序中運用雙緩存雙線程等大規模數據處理的技巧 或者要怎麼做可以一次調入一塊數據進行處理
線程技術主要是用來並行處理一些任務,這些任務之間一般少有邏輯順序上的關聯,所以用線程技術可以提高程序整體的運行速度,特別在其中一些子線程運行速度有很大差距的情況下。
各類軟體使用緩存的方式都不一樣。雙緩存或者多個緩存、緩存池等等方式都有。關鍵在於你的程序需要使用怎樣的緩存結構。比如說你是類似生產者消費者模型的軟體,你也許會使用多個緩存做成隊列,一頭在不斷填充,一頭則不斷消耗,這樣能大大提高整體的數據吞吐速度。
fread不輸入整塊調入,它底層是使用的read之類的函數,對文件句柄進行操作。gets函數則是對指針指向的內存地址操作。這些都是上層邏輯了,離磁碟寄存器很遠。真正加快文件讀取速度的方法有很多,比如把整個文件映射到內存里,又比如跳過磁碟緩存直接大塊讀取內容。這些有的有專門的API函數可用,有的則需要你自己改寫系統底層代碼。
建議你多看看操作系統原理方面的書,可以去試著學習下linux內核代碼和原理,這樣你對這些問題就會有更深的認識。
希望這些建議能幫助你。
⑶ 鎿嶄綔緋葷粺淇″彿閲忔帶鍒墮棶棰橈紝緙撳瓨
涓銆侀棶棰樻弿榪
鐢熶駭鑰-娑堣垂鑰呴棶棰樻槸涓涓緇忓吀鐨勮繘紼嬪悓姝ラ棶棰橈紝璇ラ棶棰樻渶鏃╃敱Dijkstra鎻愬嚭錛岀敤浠ユ紨紺轟粬鎻愬嚭鐨勪俊鍙烽噺鏈哄埗銆
浠栬佹眰璁捐″湪鍚屼竴涓榪涚▼鍦板潃絀洪棿鍐呮墽琛岀殑涓や釜綰跨▼銆
鐢熶駭鑰呯嚎紼嬬敓浜х墿鍝侊紝鐒跺悗灝嗙墿鍝佹斁緗鍦ㄤ竴涓絀虹紦鍐插尯涓渚涙秷璐硅呯嚎紼嬫秷璐廣
娑堣垂鑰呯嚎紼嬩粠緙撳啿鍖轟腑鑾峰緱鐗╁搧錛岀劧鍚庨噴鏀劇紦鍐插尯銆
褰撶敓浜ц呯嚎紼嬬敓浜х墿鍝佹椂錛屽傛灉娌℃湁絀虹紦鍐插尯鍙鐢錛岄偅涔堢敓浜ц呯嚎紼嬪繀欏葷瓑寰呮秷璐硅呯嚎紼嬮噴鏀懼嚭涓涓絀虹紦鍐插尯銆
褰撴秷璐硅呯嚎紼嬫秷璐圭墿鍝佹椂錛屽傛灉娌℃湁婊$殑緙撳啿鍖猴紝閭d箞娑堣垂鑰呯嚎紼嬪皢琚闃誨烇紝鐩村埌鏂扮殑鐗╁搧琚鐢熶駭鍑烘潵銆
鍚岀悊 鏈闂棰樹篃鏄瑕佹眰璁捐″湪鍚屼竴涓榪涚▼鍦板潃絀洪棿鍐呮墽琛岀殑涓や釜綰跨▼銆
褰撹濺絝欏敭紲ㄥ巺杈懼埌20浜轟笂綰挎椂錛屽繀欏葷瓑鏈変漢璧板嚭杞︾珯鍞紲ㄥ巺錛岃濺絝欏敭紲ㄥ巺澶栫殑璐紲ㄨ呮墠鍙榪涘叆銆傚氨濡備笂闈
鐢熶駭鑰呯嚎紼嬬敓浜х墿鍝佹椂錛屽傛灉娌℃湁絀虹紦鍐插尯鍙鐢錛岄偅涔堢敓浜ц呯嚎紼嬪繀欏葷瓑寰呮秷璐硅呯嚎紼嬮噴鏀懼嚭涓涓絀虹紦鍐插尯銆傛ゆ椂鍘呭栬喘紲ㄨ呰繘鍏ュぇ鍘呰繖涓浜嬩歡鍙浠ョ湅鍋氭槸涓涓鐢熶駭鑰呯嚎紼嬶紝鑰屽巺鍐呰喘紲ㄨ呰蛋鍑哄巺澶栬繖涓浜嬩歡灝卞彲浠ョ湅鍋氭槸涓涓娑堣垂鑰呯嚎紼嬨傚傛ら棶棰樿繋鍒冭岃В銆
浜屻佸疄鐜頒唬鐮
#include <windows.h>
#include <iostream>
const unsigned short SIZE_OF_BUFFER = 10; //緙撳啿鍖洪暱搴
unsigned short ProctID = 0; //浜у搧鍙
unsigned short ConsumeID = 0; //灝嗚娑堣楃殑浜у搧鍙
unsigned short in = 0; //浜у搧榪涚紦鍐插尯鏃剁殑緙撳啿鍖轟笅鏍
unsigned short out = 0; //浜у搧鍑虹紦鍐插尯鏃剁殑緙撳啿鍖轟笅鏍
int g_buffer[SIZE_OF_BUFFER]; //緙撳啿鍖烘槸涓寰鐜闃熷垪
bool g_continue = true; //鎺у埗紼嬪簭緇撴潫
HANDLE g_hMutex; //鐢ㄤ簬綰跨▼闂寸殑浜掓枼
HANDLE g_hFullSemaphore; //褰撶紦鍐插尯婊℃椂榪浣跨敓浜ц呯瓑寰
HANDLE g_hEmptySemaphore; //褰撶紦鍐插尯絀烘椂榪浣挎秷璐硅呯瓑寰
DWORD WINAPI Procer(LPVOID); //鐢熶駭鑰呯嚎紼
DWORD WINAPI Consumer(LPVOID); //娑堣垂鑰呯嚎紼
int main()
{
//鍒涘緩鍚勪釜浜掓枼淇″彿
g_hMutex = CreateMutex(NULL,FALSE,NULL);
g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
//璋冩暣涓嬮潰鐨勬暟鍊礆紝鍙浠ュ彂鐜幫紝褰撶敓浜ц呬釜鏁板氫簬娑堣垂鑰呬釜鏁版椂錛
//鐢熶駭閫熷害蹇錛岀敓浜ц呯粡甯哥瓑寰呮秷璐硅咃紱鍙嶄箣錛屾秷璐硅呯粡甯哥瓑寰
const unsigned short PRODUCERS_COUNT = 3; //鐢熶駭鑰呯殑涓鏁
const unsigned short CONSUMERS_COUNT = 1; //娑堣垂鑰呯殑涓鏁
//鎬葷殑綰跨▼鏁
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;
DWORD procerID[CONSUMERS_COUNT]; //鐢熶駭鑰呯嚎紼嬬殑鏍囪瘑絎
DWORD consumerID[THREADS_COUNT]; //娑堣垂鑰呯嚎紼嬬殑鏍囪瘑絎
//鍒涘緩鐢熶駭鑰呯嚎紼
for (int i=0;i<PRODUCERS_COUNT;++i){
hThreads[i]=CreateThread(NULL,0,Procer,NULL,0,&procerID[i]);
if (hThreads[i]==NULL) return -1;
}
//鍒涘緩娑堣垂鑰呯嚎紼
for (int i=0;i<CONSUMERS_COUNT;++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
if (hThreads[i]==NULL) return -1;
}
while(g_continue){
if(getchar()){ //鎸夊洖杞﹀悗緇堟㈢▼搴忚繍琛
g_continue = false;
}
}
return 0;
}
//鐢熶駭涓涓浜у搧銆傜畝鍗曟ā鎷熶簡涓涓嬶紝浠呰緭鍑烘柊浜у搧鐨処D鍙
void Proce()
{
std::cerr << "Procing " << ++ProctID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//鎶婃柊鐢熶駭鐨勪駭鍝佹斁鍏ョ紦鍐插尯
void Append()
{
std::cerr << "Appending a proct ... ";
g_buffer[in] = ProctID;
in = (in+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//杈撳嚭緙撳啿鍖哄綋鍓嶇殑鐘舵
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 鐢熶駭";
if (i==out) std::cout << " <-- 娑堣垂";
std::cout << std::endl;
}
}
//浠庣紦鍐插尯涓鍙栧嚭涓涓浜у搧
void Take()
{
std::cerr << "Taking a proct ... ";
ConsumeID = g_buffer[out];
out = (out+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//杈撳嚭緙撳啿鍖哄綋鍓嶇殑鐘舵
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 鐢熶駭";
if (i==out) std::cout << " <-- 娑堣垂";
std::cout << std::endl;
}
}
//娑堣椾竴涓浜у搧
void Consume()
{
std::cerr << "Consuming " << ConsumeID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//鐢熶駭鑰
DWORD WINAPI Procer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hFullSemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Proce();
Append();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
}
return 0;
}
//娑堣垂鑰
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hEmptySemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Take();
Consume();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hFullSemaphore,1,NULL);
}
return 0;
}
⑷ 線程池-參數篇:2.隊列
多線程環境中,通過隊列可以很容易實現線程間數據共享,比如經典的「生產者」和「消費者」模型中,通過隊列可以很便利地實現兩者之間的數據共享;同時作為BlockingQueue的使用者,我們不需要關心什麼時候需要阻塞線程,什麼時候需要喚醒線程,因為這一切BlockingQueue的實現者都給一手包辦了。
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,另外還保存著兩個整形變數,分別標識著隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正並行運行,而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。
按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行運行。
基於鏈表的阻塞隊列,其內部也維持著一個數據緩沖隊列(由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。
對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味著在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而後者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。如果沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於罩伍消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
DelayQueue用於放置實現了Delayed介面的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。
Delayed 是一種混合風格的介面,用來標記那些應該在給定延遲時間之後執行的對象。Delayed擴展了Comparable介面,比較的基準為延時的時間值,Delayed介面的實現類getDelay的返回值應為固定宴悶沖值(final)。DelayQueue內部是使用PriorityQueue實現的。
考慮以下場景:
一種笨笨的辦法就是,使用一個後台線程,遍歷所有對象,挨個檢查。這種笨笨的辦法簡單好用,但是對象數量過多時,可能存在性能問題,檢查間隔時間不好設置,間隔時間過大,影響精確度,多小則存在效率問題。而且做不到按超時的時間順序處理。
這場景,使用DelayQueue最適合了,詳情查看 DelayedQueue學習筆記 ; 精巧好用的DelayQueue
基於優先順序的阻塞隊列(優先順序的判斷通過構造函數傳入的Compator對象來決定),需要注意PriorityBlockingQueue並不會阻塞數據生產者晌殲,而只會在沒有可消費的數據時,阻塞數據的消費者。
使用時,若生產者生產數據的速度快於消費者消費數據的速度,隨著長時間的運行,可能會耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。
SynchronousQueue是一個內部只能包含零個元素的隊列。插入元素到隊列的線程被阻塞,直到另一個線程從隊列中獲取元素。同樣,如果線程嘗試獲取元素並且當前沒有線程在插入元素,則該線程將被阻塞,直到有線程將元素插入隊列
聲明一個SynchronousQueue有公平模式和非公平模式,區別如下:
參考: Java多線程-工具篇-BlockingQueue
12. SynchronousQueue